From bdd99db9f72c989c486b46dd2b65adf73380bb60 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Thu, 16 Nov 2023 16:24:33 +0000 Subject: [PATCH] test: improve stability of BenchmarkPublisher (#11787) (#12065) fix race condition with indexed int by using an atomic int and decompress the body for accurate validation logic (cherry picked from commit bdd5b8d90fe64c2d5fdebdf9b154eea5ef96239d) Co-authored-by: kruskall <99559985+kruskall@users.noreply.github.com> --- internal/publish/pub_test.go | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/internal/publish/pub_test.go b/internal/publish/pub_test.go index a22389f84a5..37c1be50675 100644 --- a/internal/publish/pub_test.go +++ b/internal/publish/pub_test.go @@ -19,6 +19,7 @@ package publish_test import ( "bufio" + "compress/gzip" "context" "fmt" "net/http" @@ -102,16 +103,21 @@ func BenchmarkPublisher(b *testing.B) { fmt.Fprintln(w, `{"version":{"number":"1.2.3"}}`) }) - var indexed int64 + var indexed atomic.Int64 mux.HandleFunc("/_bulk", func(w http.ResponseWriter, r *http.Request) { - scanner := bufio.NewScanner(r.Body) + gzr, err := gzip.NewReader(r.Body) + assert.NoError(b, err) + defer gzr.Close() + + scanner := bufio.NewScanner(gzr) var n int64 - for scanner.Scan() { - if scanner.Scan() { + for scanner.Scan() { // index + if scanner.Scan() { // actual event n++ } } - atomic.AddInt64(&indexed, n) + assert.NoError(b, scanner.Err()) + indexed.Add(n) }) srv := httptest.NewServer(mux) defer srv.Close() @@ -169,7 +175,7 @@ func BenchmarkPublisher(b *testing.B) { assert.NoError(b, publisher.Stop(context.Background())) assert.NoError(b, acker.Wait(context.Background())) assert.NoError(b, pipeline.Close()) - assert.Equal(b, int64(b.N), indexed) + assert.Equal(b, int64(b.N), indexed.Load()) } func newBlockingPipeline(t testing.TB) (*pipeline.Pipeline, *mockClient) {