Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions pkg/kv/kvserver/raftstorebench/bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,32 @@ func Run(t T, cfg Config) Result {

q := makeReplicaQueue(cfg.NumReplicas)

var wg sync.WaitGroup
s := newAggStats()

statsCtx, stopStats := context.WithCancel(context.Background())
defer stopStats()
go statsLoop(t, statsCtx, cfg, s, raftEng, smEng)
statsCtx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()
var bgWG sync.WaitGroup
bgWG.Add(1)
go func() {
defer bgWG.Done()
statsLoop(t, statsCtx, cfg, s, raftEng, smEng)
}()

o := writeOptions{
cfg: cfg, smEng: smEng, raftEng: raftEng,
keyLen: keyLen, valueLen: valueLen, batchLen: batchLen,
}
var durabilityCallbackCount atomic.Int64
tStartWorkers := timeutil.Now()

var workerWG sync.WaitGroup
for i := 0; i < cfg.NumWorkers; i++ {
wg.Add(1)
workerWG.Add(1)
w := &worker{
t: t, s: s, o: o, rng: rand.New(rand.NewSource(int64(i))),
durabilityCallbackCount: &durabilityCallbackCount,
}
go w.run(t, q, &wg)
go w.run(t, q, &workerWG)
}
logf(t, "started workers")

Expand All @@ -63,7 +69,9 @@ func Run(t T, cfg Config) Result {
var bytesFlushed uint64
var n int
notifyCh := make(chan struct{}, 1)
bgWG.Add(1)
go func() {
defer bgWG.Done()
for {
select {
case <-statsCtx.Done():
Expand All @@ -87,8 +95,9 @@ func Run(t T, cfg Config) Result {
})
}

wg.Wait()
stopStats()
workerWG.Wait()
cancelCtx()
bgWG.Wait() // make sure all goroutines are stopped by time engine closes
duration := timeutil.Since(tStartWorkers)
logf(t, "done working")

Expand Down