Skip to content

Commit b64ead9

Browse files
committed
op-batcher: extract l.wg away from actor goroutines
1 parent b662254 commit b64ead9

File tree

2 files changed

+29
-17
lines changed

2 files changed

+29
-17
lines changed

op-batcher/batcher/driver.go

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -179,15 +179,27 @@ func (l *BatchSubmitter) StartBatchSubmitting() error {
179179
// DA throttling loop should always be started except for testing (indicated by ThrottleThreshold == 0)
180180
if l.Config.ThrottleParams.LowerThreshold > 0 {
181181
l.wg.Add(1)
182-
go l.throttlingLoop() // ranges over unsafeBytesUpdated channel
182+
go func() {
183+
l.throttlingLoop() // ranges over unsafeBytesUpdated channel
184+
l.wg.Done()
185+
}()
183186
} else {
184187
l.Log.Warn("Throttling loop is DISABLED due to 0 throttle-threshold. This should not be disabled in prod.")
185188
}
186189

187190
l.wg.Add(3)
188-
go l.receiptsLoop() // ranges over receipts channel
189-
go l.publishingLoop(l.killCtx) // ranges over publishSignal, spawns routines which send on receipts. Closes receipts when done.
190-
go l.blockLoadingLoop(l.shutdownCtx) // sends on unsafeBytesUpdated (if throttling enabled), and publishSignal. Closes them both when done.
191+
go func() {
192+
l.receiptsLoop() // ranges over receipts channel
193+
l.wg.Done()
194+
}()
195+
go func() {
196+
l.publishingLoop(l.killCtx) // ranges over publishSignal, spawns routines which send on receipts. Closes receipts when done.
197+
l.wg.Done()
198+
}()
199+
go func() {
200+
l.blockLoadingLoop(l.shutdownCtx) // sends on unsafeBytesUpdated (if throttling enabled), and publishSignal. Closes them both when done.
201+
l.wg.Done()
202+
}()
191203

192204
l.Log.Info("Batch Submitter started")
193205
return nil
@@ -490,7 +502,6 @@ func (l *BatchSubmitter) syncAndPrune(syncStatus *eth.SyncStatus) *inclusiveBloc
490502
// - sends transactions to the DA layer
491503
func (l *BatchSubmitter) publishingLoop(ctx context.Context) {
492504
defer close(l.receipts)
493-
defer l.wg.Done()
494505

495506
daGroup := &errgroup.Group{}
496507
// errgroup with limit of 0 means no goroutine is able to run concurrently,
@@ -530,7 +541,6 @@ func (l *BatchSubmitter) blockLoadingLoop(ctx context.Context) {
530541
defer ticker.Stop()
531542
defer close(l.unsafeBytesUpdatedSignal)
532543
defer close(l.publishSignal)
533-
defer l.wg.Done()
534544
for {
535545
select {
536546
case <-ticker.C:
@@ -567,7 +577,6 @@ func (l *BatchSubmitter) blockLoadingLoop(ctx context.Context) {
567577

568578
// receiptsLoop handles transaction receipts from the DA layer
569579
func (l *BatchSubmitter) receiptsLoop() {
570-
defer l.wg.Done()
571580
l.Log.Info("Starting receipts processing loop")
572581
for r := range l.receipts {
573582
if errors.Is(r.Err, txpool.ErrAlreadyReserved) && l.txpoolState == TxpoolGood {
@@ -586,8 +595,7 @@ func (l *BatchSubmitter) receiptsLoop() {
586595
}
587596

588597
// singleEndpointThrottler handles throttling for a specific endpoint
589-
func (l *BatchSubmitter) singleEndpointThrottler(wg *sync.WaitGroup, throttleSignal chan struct{}, endpoint string) {
590-
defer wg.Done()
598+
func (l *BatchSubmitter) singleEndpointThrottler(throttleSignal chan struct{}, endpoint string) {
591599
l.Log.Info("Starting endpoint throttling loop", "endpoint", endpoint)
592600

593601
client, err := rpc.Dial(endpoint)
@@ -674,20 +682,21 @@ func (l *BatchSubmitter) singleEndpointThrottler(wg *sync.WaitGroup, throttleSig
674682
// throttlingLoop acts as a distributor that spawns individual throttling loops for each endpoint
675683
// and fans out the unsafe bytes updates to each endpoint
676684
func (l *BatchSubmitter) throttlingLoop() {
677-
defer l.wg.Done()
678685
l.Log.Info("Starting DA throttling loop",
679686
"controller_type", l.throttleController.GetType(),
680687
"lower_threshold", l.Config.ThrottleParams.LowerThreshold,
681688
"upper_threshold", l.Config.ThrottleParams.UpperThreshold,
682689
)
683-
updateChans := make([]chan struct{}, len(l.Config.ThrottleParams.Endpoints))
684-
685-
innerWg := sync.WaitGroup{}
686690

691+
updateChans := make([]chan struct{}, len(l.Config.ThrottleParams.Endpoints))
692+
var wg sync.WaitGroup
687693
for i, endpoint := range l.Config.ThrottleParams.Endpoints {
688694
updateChans[i] = make(chan struct{}, 1)
689-
innerWg.Add(1)
690-
go l.singleEndpointThrottler(&innerWg, updateChans[i], endpoint)
695+
wg.Add(1)
696+
go func() {
697+
l.singleEndpointThrottler(updateChans[i], endpoint)
698+
wg.Done()
699+
}()
691700
}
692701

693702
for unsafeBytes := range l.unsafeBytesUpdatedSignal {
@@ -723,7 +732,7 @@ func (l *BatchSubmitter) throttlingLoop() {
723732
for _, updateChan := range updateChans {
724733
close(updateChan)
725734
}
726-
innerWg.Wait()
735+
wg.Wait()
727736
}
728737

729738
func (l *BatchSubmitter) waitNodeSyncAndClearState() {

op-batcher/batcher/driver_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,10 @@ func TestBatchSubmitter_ThrottlingEndpoints(t *testing.T) {
256256

257257
// Start throttling loop in a goroutine
258258
bs.wg.Add(1)
259-
go bs.throttlingLoop()
259+
go func() {
260+
bs.throttlingLoop()
261+
bs.wg.Done()
262+
}()
260263

261264
// Simulate block loading by sending periodically on pendingBytesUpdated
262265
wg := sync.WaitGroup{}

0 commit comments

Comments
 (0)