diff --git a/pkg/kv/kvserver/raftstorebench/bench.go b/pkg/kv/kvserver/raftstorebench/bench.go index 0bbff01d9509..144d6ab6c0fa 100644 --- a/pkg/kv/kvserver/raftstorebench/bench.go +++ b/pkg/kv/kvserver/raftstorebench/bench.go @@ -28,12 +28,16 @@ func Run(t T, cfg Config) Result { q := makeReplicaQueue(cfg.NumReplicas) - var wg sync.WaitGroup s := newAggStats() - statsCtx, stopStats := context.WithCancel(context.Background()) - defer stopStats() - go statsLoop(t, statsCtx, cfg, s, raftEng, smEng) + statsCtx, cancelCtx := context.WithCancel(context.Background()) + defer cancelCtx() + var bgWG sync.WaitGroup + bgWG.Add(1) + go func() { + defer bgWG.Done() + statsLoop(t, statsCtx, cfg, s, raftEng, smEng) + }() o := writeOptions{ cfg: cfg, smEng: smEng, raftEng: raftEng, @@ -41,13 +45,15 @@ func Run(t T, cfg Config) Result { } var durabilityCallbackCount atomic.Int64 tStartWorkers := timeutil.Now() + + var workerWG sync.WaitGroup for i := 0; i < cfg.NumWorkers; i++ { - wg.Add(1) + workerWG.Add(1) w := &worker{ t: t, s: s, o: o, rng: rand.New(rand.NewSource(int64(i))), durabilityCallbackCount: &durabilityCallbackCount, } - go w.run(t, q, &wg) + go w.run(t, q, &workerWG) } logf(t, "started workers") @@ -63,7 +69,9 @@ func Run(t T, cfg Config) Result { var bytesFlushed uint64 var n int notifyCh := make(chan struct{}, 1) + bgWG.Add(1) go func() { + defer bgWG.Done() for { select { case <-statsCtx.Done(): @@ -87,8 +95,9 @@ func Run(t T, cfg Config) Result { }) } - wg.Wait() - stopStats() + workerWG.Wait() + cancelCtx() + bgWG.Wait() // make sure all goroutines are stopped by time engine closes duration := timeutil.Since(tStartWorkers) logf(t, "done working")