Skip to content

Commit

Permalink
test: improve stability of BenchmarkPublisher (#11787) (#12065)
Browse files Browse the repository at this point in the history
fix race condition with indexed int by using an atomic int and
decompress the body for accurate validation logic

(cherry picked from commit bdd5b8d)

Co-authored-by: kruskall <[email protected]>
  • Loading branch information
mergify[bot] and kruskall authored Nov 16, 2023
1 parent bd3ef40 commit bdd99db
Showing 1 changed file with 12 additions and 6 deletions.
18 changes: 12 additions & 6 deletions internal/publish/pub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package publish_test

import (
"bufio"
"compress/gzip"
"context"
"fmt"
"net/http"
Expand Down Expand Up @@ -102,16 +103,21 @@ func BenchmarkPublisher(b *testing.B) {
fmt.Fprintln(w, `{"version":{"number":"1.2.3"}}`)
})

var indexed int64
var indexed atomic.Int64
mux.HandleFunc("/_bulk", func(w http.ResponseWriter, r *http.Request) {
scanner := bufio.NewScanner(r.Body)
gzr, err := gzip.NewReader(r.Body)
assert.NoError(b, err)
defer gzr.Close()

scanner := bufio.NewScanner(gzr)
var n int64
for scanner.Scan() {
if scanner.Scan() {
for scanner.Scan() { // index
if scanner.Scan() { // actual event
n++
}
}
atomic.AddInt64(&indexed, n)
assert.NoError(b, scanner.Err())
indexed.Add(n)
})
srv := httptest.NewServer(mux)
defer srv.Close()
Expand Down Expand Up @@ -169,7 +175,7 @@ func BenchmarkPublisher(b *testing.B) {
assert.NoError(b, publisher.Stop(context.Background()))
assert.NoError(b, acker.Wait(context.Background()))
assert.NoError(b, pipeline.Close())
assert.Equal(b, int64(b.N), indexed)
assert.Equal(b, int64(b.N), indexed.Load())
}

func newBlockingPipeline(t testing.TB) (*pipeline.Pipeline, *mockClient) {
Expand Down

0 comments on commit bdd99db

Please sign in to comment.