diff --git a/internal/publish/pub_test.go b/internal/publish/pub_test.go index a22389f84a5..37c1be50675 100644 --- a/internal/publish/pub_test.go +++ b/internal/publish/pub_test.go @@ -19,6 +19,7 @@ package publish_test import ( "bufio" + "compress/gzip" "context" "fmt" "net/http" @@ -102,16 +103,21 @@ func BenchmarkPublisher(b *testing.B) { fmt.Fprintln(w, `{"version":{"number":"1.2.3"}}`) }) - var indexed int64 + var indexed atomic.Int64 mux.HandleFunc("/_bulk", func(w http.ResponseWriter, r *http.Request) { - scanner := bufio.NewScanner(r.Body) + gzr, err := gzip.NewReader(r.Body) + assert.NoError(b, err) + defer gzr.Close() + + scanner := bufio.NewScanner(gzr) var n int64 - for scanner.Scan() { - if scanner.Scan() { + for scanner.Scan() { // index + if scanner.Scan() { // actual event n++ } } - atomic.AddInt64(&indexed, n) + assert.NoError(b, scanner.Err()) + indexed.Add(n) }) srv := httptest.NewServer(mux) defer srv.Close() @@ -169,7 +175,7 @@ func BenchmarkPublisher(b *testing.B) { assert.NoError(b, publisher.Stop(context.Background())) assert.NoError(b, acker.Wait(context.Background())) assert.NoError(b, pipeline.Close()) - assert.Equal(b, int64(b.N), indexed) + assert.Equal(b, int64(b.N), indexed.Load()) } func newBlockingPipeline(t testing.TB) (*pipeline.Pipeline, *mockClient) {