Skip to content

Commit 0e415fb

Browse files
committed
op-batcher: introduce moreComing and conditionally register L1 block
1 parent dd69913 commit 0e415fb

File tree

3 files changed

+18
-20
lines changed

3 files changed

+18
-20
lines changed

op-acceptance-tests/tests/batcher/batcher_test.go

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ func TestBatcherFullChannelsAfterDowntime(gt *testing.T) {
4949
parent = sys.L2CL.HeadBlockRef(types.LocalUnsafe).Hash
5050

5151
sys.AdvanceTime(time.Second * 2)
52+
time.Sleep(20 * time.Millisecond) // failed to force-include tx: type: 2 sender; err: nonce too high
5253
}
5354

5455
l.Debug("Sequencing L1 block", "iteration_j", j)
@@ -68,7 +69,7 @@ func TestBatcherFullChannelsAfterDowntime(gt *testing.T) {
6869
l.Info("Channel details", "channelID", c.String(), "frameCount", len(channelFrames[c]), "dataLength_frame0", len(channelFrames[c][0].Data))
6970
}
7071

71-
require.Equal(t, 10, len(channels)) // we expect a total of 10 channels, due to existing MaxChannelDuration bug filed at: https://github.com/ethereum-optimism/optimism/issues/18092
72+
require.Equal(t, 2, len(channels)) // we expect a total of 2 channels
7273

7374
// values are dependent on:
7475
// - MaxPendingTransactions
@@ -79,16 +80,8 @@ func TestBatcherFullChannelsAfterDowntime(gt *testing.T) {
7980
max int
8081
note string
8182
}{
82-
{min: 7_000, max: 10_000, note: "channel 0 - the first 100 blocks"},
83-
{min: 100, max: 1000, note: "channel 1 - only a few blocks due to racy behavior, nowhere near the channel capacity"},
84-
{min: 100, max: 1000, note: "channel 2 - only a few blocks due to racy behavior, nowhere near the channel capacity"},
85-
{min: 100, max: 1000, note: "channel 3 - only a few blocks due to racy behavior, nowhere near the channel capacity"},
86-
{min: 100, max: 1000, note: "channel 4 - only a few blocks due to racy behavior, nowhere near the channel capacity"},
87-
{min: 100, max: 1000, note: "channel 5 - only a few blocks due to racy behavior, nowhere near the channel capacity"},
88-
{min: 100, max: 1000, note: "channel 6 - only a few blocks due to racy behavior, nowhere near the channel capacity"},
89-
{min: 100, max: 1000, note: "channel 7 - only a few blocks due to racy behavior, nowhere near the channel capacity"},
90-
{min: 20_000, max: 40_000, note: "channel 8 - filled to the max capacity, due to blocking behavior because of MaxPendingTransactions limit reached"},
91-
{min: 20_000, max: 40_000, note: "channel 9 - filled to the max capacity, due to blocking behavior because of MaxPendingTransactions limit reached"},
83+
{min: 30_000, max: 40_000, note: "channel 0 - filled to the max capacity"},
84+
{min: 30_000, max: 40_000, note: "channel 1 - remaining data, filling channel close to max capacity"},
9285
}
9386

9487
for i, entry := range sizeRanges {

op-batcher/batcher/channel_manager.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -224,8 +224,8 @@ func (s *channelManager) nextTxData(channel *channel) (txData, error) {
224224
// It will decide whether to switch DA type automatically.
225225
// When switching DA type, the channelManager state will be rebuilt
226226
// with a new ChannelConfig.
227-
func (s *channelManager) TxData(l1Head eth.BlockID, isPectra bool, isThrottling bool, forcePublish pubInfo) (txData, error) {
228-
channel, err := s.getReadyChannel(l1Head, forcePublish)
227+
func (s *channelManager) TxData(l1Head eth.BlockID, isPectra bool, isThrottling bool, pi pubInfo) (txData, error) {
228+
channel, err := s.getReadyChannel(l1Head, pi)
229229
if err != nil {
230230
return emptyTxData, err
231231
}
@@ -259,7 +259,7 @@ func (s *channelManager) TxData(l1Head eth.BlockID, isPectra bool, isThrottling
259259
s.defaultCfg = newCfg
260260

261261
// Try again to get data to send on chain.
262-
channel, err = s.getReadyChannel(l1Head, forcePublish)
262+
channel, err = s.getReadyChannel(l1Head, pi)
263263
if err != nil {
264264
return emptyTxData, err
265265
}
@@ -318,7 +318,12 @@ func (s *channelManager) getReadyChannel(l1Head eth.BlockID, pi pubInfo) (*chann
318318
return nil, err
319319
}
320320

321-
s.registerL1Block(l1Head)
321+
if !pi.moreComing {
322+
// Register current L1 head only after all pending blocks have been
323+
// processed. Even if a timeout will be triggered now, it is better to have
324+
// all pending blocks be included in this channel for submission.
325+
s.registerL1Block(l1Head)
326+
}
322327

323328
if err := s.outputFrames(); err != nil {
324329
return nil, err

op-batcher/batcher/driver.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,7 @@ func (l *BatchSubmitter) publishingLoop(ctx context.Context, wg *sync.WaitGroup,
501501

502502
for pi := range l.publishSignal {
503503
l.Log.Debug("publishing loop received signal", "force_publish", pi.forcePublish)
504-
l.publishStateToL1(ctx, txQueue, receiptsCh, daGroup, pi.forcePublish)
504+
l.publishStateToL1(ctx, txQueue, receiptsCh, daGroup, pi)
505505
}
506506

507507
// First wait for all DA requests to finish to prevent new transactions being queued
@@ -771,7 +771,7 @@ func (l *BatchSubmitter) waitNodeSync() error {
771771

772772
// publishStateToL1 queues up all pending TxData to be published to the L1, returning when there is no more data to
773773
// queue for publishing or if there was an error queuing the data.
774-
func (l *BatchSubmitter) publishStateToL1(ctx context.Context, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daGroup *errgroup.Group, forcePublish bool) {
774+
func (l *BatchSubmitter) publishStateToL1(ctx context.Context, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daGroup *errgroup.Group, pi pubInfo) {
775775
for {
776776
select {
777777
case <-ctx.Done():
@@ -788,7 +788,7 @@ func (l *BatchSubmitter) publishStateToL1(ctx context.Context, queue *txmgr.Queu
788788
return
789789
}
790790

791-
err := l.publishTxToL1(ctx, queue, receiptsCh, daGroup, forcePublish)
791+
err := l.publishTxToL1(ctx, queue, receiptsCh, daGroup, pi)
792792
if err != nil {
793793
if err != io.EOF {
794794
l.Log.Error("Error publishing tx to l1", "err", err)
@@ -842,7 +842,7 @@ func (l *BatchSubmitter) clearState(ctx context.Context) {
842842
}
843843

844844
// publishTxToL1 submits a single state tx to the L1
845-
func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daGroup *errgroup.Group, forcePublish bool) error {
845+
func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daGroup *errgroup.Group, pi pubInfo) error {
846846
// send all available transactions
847847
l1tip, isPectra, err := l.l1Tip(ctx)
848848
if err != nil {
@@ -855,7 +855,7 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t
855855
// Collect next transaction data. This pulls data out of the channel, so we need to make sure
856856
// to put it back if ever da or txmgr requests fail, by calling l.recordFailedDARequest/recordFailedTx.
857857
l.channelMgrMutex.Lock()
858-
txdata, err := l.channelMgr.TxData(l1tip.ID(), isPectra, params.IsThrottling(), pubInfo{forcePublish: forcePublish})
858+
txdata, err := l.channelMgr.TxData(l1tip.ID(), isPectra, params.IsThrottling(), pi)
859859
l.channelMgrMutex.Unlock()
860860

861861
if err == io.EOF {

0 commit comments

Comments
 (0)