Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
1815cc7
Pass the new limit to the batch poster and use it in the batchSegment…
pmikolajczyk41 Nov 3, 2025
a4fc03f
Set the field for tests
pmikolajczyk41 Nov 3, 2025
6d128d9
Get rid of unnecessary usage of the limit
pmikolajczyk41 Nov 3, 2025
82172d7
Cover inbox usages and tests
pmikolajczyk41 Nov 3, 2025
b7aa16a
Cover MEL
pmikolajczyk41 Nov 3, 2025
94f18b1
Add accessor for the limit in the config
pmikolajczyk41 Nov 10, 2025
4730759
Pass chain config instead of arbitrum
pmikolajczyk41 Nov 10, 2025
75d679f
Avoid nil pointer dereference
pmikolajczyk41 Nov 10, 2025
5238ed6
Add rationale for nil
pmikolajczyk41 Nov 10, 2025
ab40ca2
update pin
pmikolajczyk41 Nov 10, 2025
75057a7
Merge remote-tracking branch 'origin/master' into pmikolajczyk/nit-31…
pmikolajczyk41 Nov 10, 2025
69cab03
fix forkid
pmikolajczyk41 Nov 10, 2025
9a11649
Merge remote-tracking branch 'origin/master' into pmikolajczyk/nit-31…
pmikolajczyk41 Nov 10, 2025
0a0d6af
Merge remote-tracking branch 'refs/remotes/origin/master' into pmikol…
pmikolajczyk41 Dec 5, 2025
776f1b5
Update fork id in test
pmikolajczyk41 Dec 5, 2025
82e8518
Leaving chain info JSON as before (consistent with on-chain info)
pmikolajczyk41 Dec 5, 2025
6ecd436
Add a test for configurable limit in batch poster
pmikolajczyk41 Dec 5, 2025
b078ad4
Add two tests for parsing
pmikolajczyk41 Dec 8, 2025
2e898fd
Lint
pmikolajczyk41 Dec 8, 2025
86125b0
ForkID
pmikolajczyk41 Dec 8, 2025
cdc8678
Merge branch 'master' into pmikolajczyk/nit-3121-uncompressed-batch-s…
pmikolajczyk41 Dec 8, 2025
1c4e943
Remove batch poster system test
pmikolajczyk41 Dec 9, 2025
dec7e98
Rename setup function
pmikolajczyk41 Dec 9, 2025
24a2d00
Make batch size limit arbos-version dependent
pmikolajczyk41 Dec 9, 2025
d8a5394
Use arbos version in batch poster
pmikolajczyk41 Dec 9, 2025
b4d5cce
Use arbos version in inbox and MEL
pmikolajczyk41 Dec 9, 2025
3da6e6f
Checkpoint
pmikolajczyk41 Dec 9, 2025
e4ed04f
checkpoint
pmikolajczyk41 Dec 9, 2025
03233dd
fix tests
pmikolajczyk41 Dec 9, 2025
fd6384f
nil protection
pmikolajczyk41 Dec 9, 2025
c3da851
update pin
pmikolajczyk41 Dec 9, 2025
c6856f1
Merge branch 'master' into pmikolajczyk/nit-3121-uncompressed-batch-s…
pmikolajczyk41 Dec 9, 2025
883a3e1
forbid changing the limit in the chain config
pmikolajczyk41 Dec 10, 2025
cc673af
Merge remote-tracking branch 'origin/master' into pmikolajczyk/nit-31…
pmikolajczyk41 Dec 10, 2025
93cff8b
tiny refactor in replay
pmikolajczyk41 Dec 10, 2025
f6dd87e
pass non-nil arbos version getter
pmikolajczyk41 Dec 10, 2025
449b987
Use the previous message index
pmikolajczyk41 Dec 10, 2025
a9d33d8
Merge remote-tracking branch 'origin/master' into pmikolajczyk/nit-31…
pmikolajczyk41 Dec 11, 2025
f1872b4
Fixed implementation
pmikolajczyk41 Dec 11, 2025
072f95e
Set correct version in test
pmikolajczyk41 Dec 11, 2025
c491e88
lint
pmikolajczyk41 Dec 11, 2025
4a1b615
Be more resilient
pmikolajczyk41 Dec 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 64 additions & 59 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,24 +94,25 @@ type batchPosterPosition struct {

type BatchPoster struct {
stopwaiter.StopWaiter
l1Reader *headerreader.HeaderReader
inbox *InboxTracker
streamer *TransactionStreamer
arbOSVersionGetter execution.ArbOSVersionGetter
config BatchPosterConfigFetcher
seqInbox *bridgegen.SequencerInbox
syncMonitor *SyncMonitor
seqInboxABI *abi.ABI
seqInboxAddr common.Address
bridgeAddr common.Address
gasRefunderAddr common.Address
building *buildingBatch
dapWriter daprovider.Writer
dapReaders *daprovider.ReaderRegistry
dataPoster *dataposter.DataPoster
redisLock *redislock.Simple
messagesPerBatch *arbmath.MovingAverage[uint64]
non4844BatchCount int // Count of consecutive non-4844 batches posted
l1Reader *headerreader.HeaderReader
inbox *InboxTracker
streamer *TransactionStreamer
arbOSVersionGetter execution.ArbOSVersionGetter
arbitrumChainParams *params.ArbitrumChainParams
config BatchPosterConfigFetcher
seqInbox *bridgegen.SequencerInbox
syncMonitor *SyncMonitor
seqInboxABI *abi.ABI
seqInboxAddr common.Address
bridgeAddr common.Address
gasRefunderAddr common.Address
building *buildingBatch
dapWriter daprovider.Writer
dapReaders *daprovider.ReaderRegistry
dataPoster *dataposter.DataPoster
redisLock *redislock.Simple
messagesPerBatch *arbmath.MovingAverage[uint64]
non4844BatchCount int // Count of consecutive non-4844 batches posted
// This is an atomic variable that should only be accessed atomically.
// An estimate of the number of batches we want to post but haven't yet.
// This doesn't include batches which we don't want to post yet due to the L1 bounds.
Expand Down Expand Up @@ -319,18 +320,19 @@ var TestBatchPosterConfig = BatchPosterConfig{
}

type BatchPosterOpts struct {
DataPosterDB ethdb.Database
L1Reader *headerreader.HeaderReader
Inbox *InboxTracker
Streamer *TransactionStreamer
VersionGetter execution.ArbOSVersionGetter
SyncMonitor *SyncMonitor
Config BatchPosterConfigFetcher
DeployInfo *chaininfo.RollupAddresses
TransactOpts *bind.TransactOpts
DAPWriter daprovider.Writer
ParentChainID *big.Int
DAPReaders *daprovider.ReaderRegistry
DataPosterDB ethdb.Database
L1Reader *headerreader.HeaderReader
Inbox *InboxTracker
Streamer *TransactionStreamer
VersionGetter execution.ArbOSVersionGetter
SyncMonitor *SyncMonitor
Config BatchPosterConfigFetcher
DeployInfo *chaininfo.RollupAddresses
TransactOpts *bind.TransactOpts
DAPWriter daprovider.Writer
ParentChainID *big.Int
DAPReaders *daprovider.ReaderRegistry
ArbitrumChainParams *params.ArbitrumChainParams
}

func NewBatchPoster(ctx context.Context, opts *BatchPosterOpts) (*BatchPoster, error) {
Expand Down Expand Up @@ -373,23 +375,24 @@ func NewBatchPoster(ctx context.Context, opts *BatchPosterOpts) (*BatchPoster, e
return nil, err
}
b := &BatchPoster{
l1Reader: opts.L1Reader,
inbox: opts.Inbox,
streamer: opts.Streamer,
arbOSVersionGetter: opts.VersionGetter,
syncMonitor: opts.SyncMonitor,
config: opts.Config,
seqInbox: seqInbox,
seqInboxABI: seqInboxABI,
seqInboxAddr: opts.DeployInfo.SequencerInbox,
gasRefunderAddr: opts.Config().gasRefunder,
bridgeAddr: opts.DeployInfo.Bridge,
dapWriter: opts.DAPWriter,
redisLock: redisLock,
dapReaders: opts.DAPReaders,
parentChain: &parent.ParentChain{ChainID: opts.ParentChainID, L1Reader: opts.L1Reader},
checkEip7623: checkEip7623,
useEip7623: useEip7623,
l1Reader: opts.L1Reader,
inbox: opts.Inbox,
streamer: opts.Streamer,
arbOSVersionGetter: opts.VersionGetter,
arbitrumChainParams: opts.ArbitrumChainParams,
syncMonitor: opts.SyncMonitor,
config: opts.Config,
seqInbox: seqInbox,
seqInboxABI: seqInboxABI,
seqInboxAddr: opts.DeployInfo.SequencerInbox,
gasRefunderAddr: opts.Config().gasRefunder,
bridgeAddr: opts.DeployInfo.Bridge,
dapWriter: opts.DAPWriter,
redisLock: redisLock,
dapReaders: opts.DAPReaders,
parentChain: &parent.ParentChain{ChainID: opts.ParentChainID, L1Reader: opts.L1Reader},
checkEip7623: checkEip7623,
useEip7623: useEip7623,
}
b.messagesPerBatch, err = arbmath.NewMovingAverage[uint64](20)
if err != nil {
Expand Down Expand Up @@ -875,6 +878,7 @@ type batchSegments struct {
recompressionLevel int
newUncompressedSize int
totalUncompressedSize int
maxUncompressedSize int
lastCompressedSize int
trailingHeaders int // how many trailing segments are headers
isDone bool
Expand Down Expand Up @@ -934,12 +938,13 @@ func (b *BatchPoster) newBatchSegments(ctx context.Context, firstDelayed uint64,
recompressionLevel = compressionLevel
}
return &batchSegments{
compressedBuffer: compressedBuffer,
compressedWriter: brotli.NewWriterLevel(compressedBuffer, compressionLevel),
sizeLimit: maxSize,
recompressionLevel: recompressionLevel,
rawSegments: make([][]byte, 0, 128),
delayedMsg: firstDelayed,
compressedBuffer: compressedBuffer,
compressedWriter: brotli.NewWriterLevel(compressedBuffer, compressionLevel),
sizeLimit: maxSize,
recompressionLevel: recompressionLevel,
rawSegments: make([][]byte, 0, 128),
delayedMsg: firstDelayed,
maxUncompressedSize: int(b.arbitrumChainParams.MaxUncompressedBatchSize), // #nosec G115
}, nil
}

Expand All @@ -954,8 +959,8 @@ func (s *batchSegments) recompressAll() error {
return err
}
}
if s.totalUncompressedSize > arbstate.MaxDecompressedLen {
return fmt.Errorf("batch size %v exceeds maximum decompressed length %v", s.totalUncompressedSize, arbstate.MaxDecompressedLen)
if s.totalUncompressedSize > s.maxUncompressedSize {
return fmt.Errorf("batch size %v exceeds maximum uncompressed length %v", s.totalUncompressedSize, s.maxUncompressedSize)
}
if len(s.rawSegments) >= arbstate.MaxSegmentsPerSequencerMessage {
return fmt.Errorf("number of raw segments %v excees maximum number %v", len(s.rawSegments), arbstate.MaxSegmentsPerSequencerMessage)
Expand All @@ -965,10 +970,10 @@ func (s *batchSegments) recompressAll() error {

func (s *batchSegments) testForOverflow(isHeader bool) (bool, error) {
// we've reached the max decompressed size
if s.totalUncompressedSize > arbstate.MaxDecompressedLen {
log.Info("Batch full: max decompressed length exceeded",
if s.totalUncompressedSize > s.maxUncompressedSize {
log.Info("Batch full: max uncompressed length exceeded",
"current", s.totalUncompressedSize,
"max", arbstate.MaxDecompressedLen,
"max", s.maxUncompressedSize,
"isHeader", isHeader)
return true, nil
}
Expand Down Expand Up @@ -1841,7 +1846,7 @@ func (b *BatchPoster) MaybePostSequencerBatch(ctx context.Context) (bool, error)
b.building.muxBackend.seqMsg = seqMsg
b.building.muxBackend.delayedInboxStart = batchPosition.DelayedMessageCount
b.building.muxBackend.SetPositionWithinMessage(0)
simMux := arbstate.NewInboxMultiplexer(b.building.muxBackend, batchPosition.DelayedMessageCount, dapReaders, daprovider.KeysetValidate)
simMux := arbstate.NewInboxMultiplexer(b.building.muxBackend, batchPosition.DelayedMessageCount, dapReaders, daprovider.KeysetValidate, b.arbitrumChainParams) // nolint:gosec
log.Debug("Begin checking the correctness of batch against inbox multiplexer", "startMsgSeqNum", batchPosition.MessageCount, "endMsgSeqNum", b.building.msgCount-1)
for i := batchPosition.MessageCount; i < b.building.msgCount; i++ {
msg, err := simMux.Pop(ctx)
Expand Down
8 changes: 7 additions & 1 deletion arbnode/inbox_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,13 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client *ethclien
ctx: ctx,
client: client,
}
multiplexer := arbstate.NewInboxMultiplexer(backend, prevbatchmeta.DelayedMessageCount, t.dapReaders, daprovider.KeysetValidate)
multiplexer := arbstate.NewInboxMultiplexer(
backend,
prevbatchmeta.DelayedMessageCount,
t.dapReaders,
daprovider.KeysetValidate,
&t.txStreamer.chainConfig.ArbitrumChainParams,
)
batchMessageCounts := make(map[uint64]arbutil.MessageIndex)
currentPos := prevbatchmeta.MessageCount + 1
for {
Expand Down
5 changes: 5 additions & 0 deletions arbnode/mel/extraction/message_extraction_function.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/params"

"github.com/offchainlabs/nitro/arbnode/mel"
"github.com/offchainlabs/nitro/arbos/arbostypes"
Expand Down Expand Up @@ -55,11 +56,13 @@ func ExtractMessages(
delayedMsgDatabase DelayedMessageDatabase,
receiptFetcher ReceiptFetcher,
txsFetcher TransactionsFetcher,
arbitrumChainParams *params.ArbitrumChainParams,
) (*mel.State, []*arbostypes.MessageWithMetadata, []*mel.DelayedInboxMessage, error) {
return extractMessagesImpl(
ctx,
inputState,
parentChainHeader,
arbitrumChainParams,
dataProviders,
delayedMsgDatabase,
txsFetcher,
Expand All @@ -81,6 +84,7 @@ func extractMessagesImpl(
ctx context.Context,
inputState *mel.State,
parentChainHeader *types.Header,
arbitrumChainParams *params.ArbitrumChainParams,
dataProviders *daprovider.ReaderRegistry,
delayedMsgDatabase DelayedMessageDatabase,
txsFetcher TransactionsFetcher,
Expand Down Expand Up @@ -200,6 +204,7 @@ func extractMessagesImpl(
serialized,
dataProviders,
daprovider.KeysetValidate,
arbitrumChainParams,
)
if err != nil {
return nil, nil, nil, err
Expand Down
8 changes: 7 additions & 1 deletion arbnode/mel/extraction/message_extraction_function_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/params"

"github.com/offchainlabs/nitro/arbnode/mel"
"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbstate"
"github.com/offchainlabs/nitro/cmd/chaininfo"
"github.com/offchainlabs/nitro/daprovider"
)

Expand All @@ -31,7 +33,7 @@ func TestExtractMessages(t *testing.T) {
lookupDelayedMsgs func(context.Context, *mel.State, *types.Header, ReceiptFetcher, TransactionsFetcher) ([]*mel.DelayedInboxMessage, error)
serializer func(context.Context, *mel.SequencerInboxBatch, *types.Transaction, uint, ReceiptFetcher) ([]byte, error)
parseReport func(io.Reader) (*big.Int, common.Address, common.Hash, uint64, *big.Int, uint64, error)
parseSequencerMsg func(context.Context, uint64, common.Hash, []byte, *daprovider.ReaderRegistry, daprovider.KeysetValidationMode) (*arbstate.SequencerMessage, error)
parseSequencerMsg func(context.Context, uint64, common.Hash, []byte, *daprovider.ReaderRegistry, daprovider.KeysetValidationMode, *params.ArbitrumChainParams) (*arbstate.SequencerMessage, error)
extractBatchMessages func(context.Context, *mel.State, *arbstate.SequencerMessage, DelayedMessageDatabase) ([]*arbostypes.MessageWithMetadata, error)
expectedError string
expectedMsgCount uint64
Expand Down Expand Up @@ -150,13 +152,15 @@ func TestExtractMessages(t *testing.T) {
nil,
nil,
txsFetcher,
&chaininfo.ArbitrumDevTestChainConfig().ArbitrumChainParams,
)
} else {
// Test the internal extractMessagesImpl function
postState, messages, delayedMessages, err = extractMessagesImpl(
ctx,
melState,
header,
&chaininfo.ArbitrumDevTestChainConfig().ArbitrumChainParams,
nil,
nil,
txsFetcher,
Expand Down Expand Up @@ -321,6 +325,7 @@ func successfulParseSequencerMsg(
data []byte,
dapReaders *daprovider.ReaderRegistry,
keysetValidationMode daprovider.KeysetValidationMode,
arbitrumChainParams *params.ArbitrumChainParams,
) (*arbstate.SequencerMessage, error) {
return nil, nil
}
Expand All @@ -332,6 +337,7 @@ func failingParseSequencerMsg(
data []byte,
dapReaders *daprovider.ReaderRegistry,
keysetValidationMode daprovider.KeysetValidationMode,
arbitrumChainParams *params.ArbitrumChainParams,
) (*arbstate.SequencerMessage, error) {
return nil, errors.New("failed to parse sequencer message")
}
Expand Down
2 changes: 2 additions & 0 deletions arbnode/mel/extraction/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/params"

"github.com/offchainlabs/nitro/arbnode/mel"
"github.com/offchainlabs/nitro/arbos/arbostypes"
Expand Down Expand Up @@ -64,6 +65,7 @@ type sequencerMessageParserFunc func(
data []byte,
dapReaders *daprovider.ReaderRegistry,
keysetValidationMode daprovider.KeysetValidationMode,
arbitrumChainParams *params.ArbitrumChainParams,
) (*arbstate.SequencerMessage, error)

// Defines a function that can extract messages from a batch.
Expand Down
5 changes: 5 additions & 0 deletions arbnode/mel/runner/mel.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"

"github.com/offchainlabs/nitro/arbnode/mel"
"github.com/offchainlabs/nitro/arbnode/mel/extraction"
Expand All @@ -37,6 +38,7 @@ type ParentChainReader interface {
type MessageExtractor struct {
stopwaiter.StopWaiter
parentChainReader ParentChainReader
arbitrumChainParams *params.ArbitrumChainParams
addrs *chaininfo.RollupAddresses
melDB *Database
msgConsumer mel.MessageConsumer
Expand All @@ -51,6 +53,7 @@ type MessageExtractor struct {
// to be used when extracting messages from the parent chain.
func NewMessageExtractor(
parentChainReader ParentChainReader,
arbitrumChainParams *params.ArbitrumChainParams,
rollupAddrs *chaininfo.RollupAddresses,
melDB *Database,
msgConsumer mel.MessageConsumer,
Expand All @@ -67,6 +70,7 @@ func NewMessageExtractor(
}
return &MessageExtractor{
parentChainReader: parentChainReader,
arbitrumChainParams: arbitrumChainParams,
addrs: rollupAddrs,
melDB: melDB,
msgConsumer: msgConsumer,
Expand Down Expand Up @@ -223,6 +227,7 @@ func (m *MessageExtractor) Act(ctx context.Context) (time.Duration, error) {
m.melDB,
receiptFetcher,
txsFetcher,
m.arbitrumChainParams,
)
if err != nil {
return m.retryInterval, err
Expand Down
1 change: 1 addition & 0 deletions arbnode/mel/runner/mel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func TestMessageExtractor(t *testing.T) {
messageConsumer := &mockMessageConsumer{}
extractor, err := NewMessageExtractor(
parentChainReader,
&chaininfo.ArbitrumDevTestChainConfig().ArbitrumChainParams,
&chaininfo.RollupAddresses{},
melDb,
messageConsumer,
Expand Down
28 changes: 15 additions & 13 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,7 @@ func getBatchPoster(
ctx context.Context,
config *Config,
configFetcher ConfigFetcher,
l2Config *params.ChainConfig,
txOptsBatchPoster *bind.TransactOpts,
dapWriter daprovider.Writer,
l1Reader *headerreader.HeaderReader,
Expand Down Expand Up @@ -952,18 +953,19 @@ func getBatchPoster(
}
var err error
batchPoster, err = NewBatchPoster(ctx, &BatchPosterOpts{
DataPosterDB: rawdb.NewTable(arbDb, storage.BatchPosterPrefix),
L1Reader: l1Reader,
Inbox: inboxTracker,
Streamer: txStreamer,
VersionGetter: arbOSVersionGetter,
SyncMonitor: syncMonitor,
Config: func() *BatchPosterConfig { return &configFetcher.Get().BatchPoster },
DeployInfo: deployInfo,
TransactOpts: txOptsBatchPoster,
DAPWriter: dapWriter,
ParentChainID: parentChainID,
DAPReaders: dapReaders,
DataPosterDB: rawdb.NewTable(arbDb, storage.BatchPosterPrefix),
L1Reader: l1Reader,
Inbox: inboxTracker,
Streamer: txStreamer,
VersionGetter: arbOSVersionGetter,
SyncMonitor: syncMonitor,
Config: func() *BatchPosterConfig { return &configFetcher.Get().BatchPoster },
DeployInfo: deployInfo,
TransactOpts: txOptsBatchPoster,
DAPWriter: dapWriter,
ParentChainID: parentChainID,
DAPReaders: dapReaders,
ArbitrumChainParams: &l2Config.ArbitrumChainParams,
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -1160,7 +1162,7 @@ func createNodeImpl(
return nil, err
}

batchPoster, err := getBatchPoster(ctx, config, configFetcher, txOptsBatchPoster, dapWriter, l1Reader, inboxTracker, txStreamer, arbOSVersionGetter, arbDb, syncMonitor, deployInfo, parentChainID, dapReaders, stakerAddr)
batchPoster, err := getBatchPoster(ctx, config, configFetcher, l2Config, txOptsBatchPoster, dapWriter, l1Reader, inboxTracker, txStreamer, arbOSVersionGetter, arbDb, syncMonitor, deployInfo, parentChainID, dapReaders, stakerAddr)
if err != nil {
return nil, err
}
Expand Down
Loading
Loading