Skip to content

Commit 32e3a8b

Browse files
committed
op-batcher: introduce moreComing and conditionally register L1 block
1 parent 828c2dc commit 32e3a8b

File tree

3 files changed

+22
-23
lines changed

3 files changed

+22
-23
lines changed

op-acceptance-tests/tests/batcher/batcher_test.go

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,14 @@ func TestBatcherFullChannelsAfterDowntime(gt *testing.T) {
3737
l.Info("Latest unsafe block after stopping the L2 sequencer", "latestUnsafe", latestUnsafe_A)
3838

3939
parent := latestUnsafe_A
40+
nonce := uint64(0)
4041
for j := 0; j < 200; j++ {
4142
l1Origin := sys.L1EL.BlockRefByLabel(eth.Unsafe).Hash
4243

4344
for i := 0; i < 5; i++ {
4445
l.Debug("Sequencing L2 block", "iteration", i, "parent", parent)
45-
sequenceBlockWithL1Origin(t, ts_L2, parent, l1Origin, alice, cathrine)
46+
sequenceBlockWithL1Origin(t, ts_L2, parent, l1Origin, alice, cathrine, nonce)
47+
nonce++
4648

4749
parent = sys.L2CL.HeadBlockRef(types.LocalUnsafe).Hash
4850

@@ -66,7 +68,7 @@ func TestBatcherFullChannelsAfterDowntime(gt *testing.T) {
6668
l.Info("Channel details", "channelID", c.String(), "frameCount", len(channelFrames[c]), "dataLength_frame0", len(channelFrames[c][0].Data))
6769
}
6870

69-
require.Equal(t, 10, len(channels)) // we expect a total of 10 channels, due to existing MaxChannelDuration bug filed at: https://github.com/ethereum-optimism/optimism/issues/18092
71+
require.Equal(t, 2, len(channels)) // we expect a total of 2 channels
7072

7173
// values are dependent on:
7274
// - MaxPendingTransactions
@@ -77,16 +79,8 @@ func TestBatcherFullChannelsAfterDowntime(gt *testing.T) {
7779
max int
7880
note string
7981
}{
80-
{min: 7_000, max: 10_000, note: "channel 0 - the first 100 blocks"},
81-
{min: 100, max: 1000, note: "channel 1 - only a few blocks due to racy behavior, nowhere near the channel capacity"},
82-
{min: 100, max: 1000, note: "channel 2 - only a few blocks due to racy behavior, nowhere near the channel capacity"},
83-
{min: 100, max: 1000, note: "channel 3 - only a few blocks due to racy behavior, nowhere near the channel capacity"},
84-
{min: 100, max: 1000, note: "channel 4 - only a few blocks due to racy behavior, nowhere near the channel capacity"},
85-
{min: 100, max: 1000, note: "channel 5 - only a few blocks due to racy behavior, nowhere near the channel capacity"},
86-
{min: 100, max: 1000, note: "channel 6 - only a few blocks due to racy behavior, nowhere near the channel capacity"},
87-
{min: 100, max: 1000, note: "channel 7 - only a few blocks due to racy behavior, nowhere near the channel capacity"},
88-
{min: 20_000, max: 40_000, note: "channel 8 - filled to the max capacity, due to blocking behavior because of MaxPendingTransactions limit reached"},
89-
{min: 20_000, max: 40_000, note: "channel 9 - filled to the max capacity, due to blocking behavior because of MaxPendingTransactions limit reached"},
82+
{min: 30_000, max: 40_000, note: "channel 0 - filled to the max capacity"},
83+
{min: 30_000, max: 40_000, note: "channel 1 - remaining data, filling channel close to max capacity"},
9084
}
9185

9286
for i, entry := range sizeRanges {
@@ -109,13 +103,13 @@ func sequenceBlock(t devtest.T, ts apis.TestSequencerControlAPI, parent common.H
109103
require.NoError(t, ts.Next(t.Ctx()))
110104
}
111105

112-
func sequenceBlockWithL1Origin(t devtest.T, ts apis.TestSequencerControlAPI, parent common.Hash, l1Origin common.Hash, alice *dsl.EOA, cathrine *dsl.EOA) {
106+
func sequenceBlockWithL1Origin(t devtest.T, ts apis.TestSequencerControlAPI, parent common.Hash, l1Origin common.Hash, alice *dsl.EOA, cathrine *dsl.EOA, nonce uint64) {
113107
require.NoError(t, ts.New(t.Ctx(), seqtypes.BuildOpts{Parent: parent, L1Origin: &l1Origin}))
114108

115109
// include simple transfer tx in opened block
116110
{
117111
to := cathrine.PlanTransfer(alice.Address(), eth.OneWei)
118-
opt := txplan.Combine(to)
112+
opt := txplan.Combine(to, txplan.WithStaticNonce(nonce))
119113
ptx := txplan.NewPlannedTx(opt)
120114
signed_tx, err := ptx.Signed.Eval(t.Ctx())
121115
require.NoError(t, err, "Expected to be able to evaluate a planned transaction on op-test-sequencer, but got error")

op-batcher/batcher/channel_manager.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -224,8 +224,8 @@ func (s *channelManager) nextTxData(channel *channel) (txData, error) {
224224
// It will decide whether to switch DA type automatically.
225225
// When switching DA type, the channelManager state will be rebuilt
226226
// with a new ChannelConfig.
227-
func (s *channelManager) TxData(l1Head eth.BlockID, isPectra bool, isThrottling bool, forcePublish pubInfo) (txData, error) {
228-
channel, err := s.getReadyChannel(l1Head, forcePublish)
227+
func (s *channelManager) TxData(l1Head eth.BlockID, isPectra bool, isThrottling bool, pi pubInfo) (txData, error) {
228+
channel, err := s.getReadyChannel(l1Head, pi)
229229
if err != nil {
230230
return emptyTxData, err
231231
}
@@ -259,7 +259,7 @@ func (s *channelManager) TxData(l1Head eth.BlockID, isPectra bool, isThrottling
259259
s.defaultCfg = newCfg
260260

261261
// Try again to get data to send on chain.
262-
channel, err = s.getReadyChannel(l1Head, forcePublish)
262+
channel, err = s.getReadyChannel(l1Head, pi)
263263
if err != nil {
264264
return emptyTxData, err
265265
}
@@ -318,7 +318,12 @@ func (s *channelManager) getReadyChannel(l1Head eth.BlockID, pi pubInfo) (*chann
318318
return nil, err
319319
}
320320

321-
s.registerL1Block(l1Head)
321+
if !pi.moreComing {
322+
// Register current L1 head only after all pending blocks have been
323+
// processed. Even if a timeout will be triggered now, it is better to have
324+
// all pending blocks be included in this channel for submission.
325+
s.registerL1Block(l1Head)
326+
}
322327

323328
if err := s.outputFrames(); err != nil {
324329
return nil, err

op-batcher/batcher/driver.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -512,7 +512,7 @@ func (l *BatchSubmitter) publishingLoop(ctx context.Context) {
512512

513513
for pi := range l.publishSignal {
514514
l.Log.Debug("publishing loop received signal", "force_publish", pi.forcePublish)
515-
l.publishStateToL1(ctx, txQueue, daGroup, pi.forcePublish)
515+
l.publishStateToL1(ctx, txQueue, daGroup, pi)
516516
}
517517

518518
// First wait for all DA requests to finish to prevent new transactions being queued
@@ -779,7 +779,7 @@ func (l *BatchSubmitter) waitNodeSync() error {
779779

780780
// publishStateToL1 queues up all pending TxData to be published to the L1, returning when there is no more data to
781781
// queue for publishing or if there was an error queuing the data.
782-
func (l *BatchSubmitter) publishStateToL1(ctx context.Context, queue *txmgr.Queue[txRef], daGroup *errgroup.Group, forcePublish bool) {
782+
func (l *BatchSubmitter) publishStateToL1(ctx context.Context, queue *txmgr.Queue[txRef], daGroup *errgroup.Group, pi pubInfo) {
783783
for {
784784
select {
785785
case <-ctx.Done():
@@ -796,7 +796,7 @@ func (l *BatchSubmitter) publishStateToL1(ctx context.Context, queue *txmgr.Queu
796796
return
797797
}
798798

799-
err := l.publishTxToL1(ctx, queue, daGroup, forcePublish)
799+
err := l.publishTxToL1(ctx, queue, daGroup, pi)
800800
if err != nil {
801801
if err != io.EOF {
802802
l.Log.Error("Error publishing tx to l1", "err", err)
@@ -850,7 +850,7 @@ func (l *BatchSubmitter) clearState(ctx context.Context) {
850850
}
851851

852852
// publishTxToL1 submits a single state tx to the L1
853-
func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txRef], daGroup *errgroup.Group, forcePublish bool) error {
853+
func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txRef], daGroup *errgroup.Group, pi pubInfo) error {
854854
// send all available transactions
855855
l1tip, isPectra, err := l.l1Tip(ctx)
856856
if err != nil {
@@ -863,7 +863,7 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t
863863
// Collect next transaction data. This pulls data out of the channel, so we need to make sure
864864
// to put it back if ever da or txmgr requests fail, by calling l.recordFailedDARequest/recordFailedTx.
865865
l.channelMgrMutex.Lock()
866-
txdata, err := l.channelMgr.TxData(l1tip.ID(), isPectra, params.IsThrottling(), pubInfo{forcePublish: forcePublish})
866+
txdata, err := l.channelMgr.TxData(l1tip.ID(), isPectra, params.IsThrottling(), pi)
867867
l.channelMgrMutex.Unlock()
868868

869869
if err == io.EOF {

0 commit comments

Comments
 (0)