diff --git a/Makefile b/Makefile index a2a29289edc..9e451fdb45d 100644 --- a/Makefile +++ b/Makefile @@ -280,6 +280,7 @@ RALLY_EXTRA_FLAGS?= RALLY_CLIENT_OPTIONS?=basic_auth_user:'admin',basic_auth_password:'changeme' RALLY_FLAGS?=--pipeline=benchmark-only --client-options="$(RALLY_CLIENT_OPTIONS)" $(RALLY_EXTRA_FLAGS) RALLY_BULK_SIZE?=5000 +RALLY_GENCORPORA_REPLAY_COUNT?=1 .PHONY: rally rally: $(PYTHON_BIN)/esrally rally/corpora @@ -295,7 +296,7 @@ $(PYTHON_BIN)/esrally: $(PYTHON_BIN) .PHONY: rally/corpora rally/corpora: @rm -fr rally/corpora && mkdir rally/corpora - @cd systemtest/cmd/gencorpora && $(GO) run . -write-dir $(CURRENT_DIR)/rally/corpora/ + @cd systemtest/cmd/gencorpora && $(GO) run . -write-dir $(CURRENT_DIR)/rally/corpora/ -replay-count $(RALLY_GENCORPORA_REPLAY_COUNT) ############################################################################## # Smoke tests -- Basic smoke tests for APM Server. diff --git a/systemtest/cmd/gencorpora/main.go b/systemtest/cmd/gencorpora/main.go index 1154c323e49..73eeab38a70 100644 --- a/systemtest/cmd/gencorpora/main.go +++ b/systemtest/cmd/gencorpora/main.go @@ -19,90 +19,21 @@ package main import ( "context" - "errors" "flag" - "fmt" "log" "os" "os/signal" "syscall" - "time" - - "github.com/hashicorp/go-multierror" - "golang.org/x/sync/errgroup" "github.com/elastic/apm-server/systemtest/gencorpora" - "github.com/elastic/apm-server/systemtest/loadgen" ) -func run(rootCtx context.Context) error { - // Create fake ES server - esServer, err := gencorpora.NewCatBulkServer() - if err != nil { - return err - } - - // Create APM-Server to send documents to fake ES - apmServer := gencorpora.NewAPMServer(rootCtx, esServer.Addr) - if err := apmServer.Start(); err != nil { - return err - } - - ctx, cancel := context.WithCancel(rootCtx) - g, gctx := errgroup.WithContext(ctx) - g.Go(func() error { - <-gctx.Done() - - var result error - if err := apmServer.Close(); err != nil { - result = multierror.Append(result, err) - } - if err := esServer.Stop(); err != nil { - result = multierror.Append(result, err) - } - - return result - }) - g.Go(esServer.Serve) - g.Go(func() error { - return gencorpora.StreamAPMServerLogs(ctx, apmServer) - }) - g.Go(func() error { - defer cancel() - - waitCtx, waitCancel := context.WithTimeout(gctx, 10*time.Second) - defer waitCancel() - if err := apmServer.WaitForPublishReady(waitCtx); err != nil { - return fmt.Errorf("failed while waiting for APM-Server to be ready with err %v", err) - } - - return generateLoad(ctx, apmServer.URL) - }) - - if err := g.Wait(); !errors.Is(err, context.Canceled) { - return err - } - - return nil -} - -func generateLoad(ctx context.Context, serverURL string) error { - inf := loadgen.GetNewLimiter(0) - handler, err := loadgen.NewEventHandler(`*.ndjson`, serverURL, "", inf) - if err != nil { - return err - } - - _, err = handler.SendBatches(ctx) - return err -} - func main() { flag.Parse() ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer cancel() - if err := run(ctx); err != nil { + if err := gencorpora.Run(ctx); err != nil { log.Fatal(err) } } diff --git a/systemtest/gencorpora/catbulk.go b/systemtest/gencorpora/catbulk.go index 6847675b6a5..115e3469104 100644 --- a/systemtest/gencorpora/catbulk.go +++ b/systemtest/gencorpora/catbulk.go @@ -104,7 +104,7 @@ func (s *CatBulkServer) Stop() error { defer s.writer.Close() if err := s.server.Shutdown(ctx); err != nil { - return fmt.Errorf("failed to shutdown cat bulk server with error %v, no metadata written", err) + return fmt.Errorf("failed to shutdown cat bulk server no metadata written: %w", err) } close(s.metaUpdateChan) diff --git a/systemtest/gencorpora/config.go b/systemtest/gencorpora/config.go index e114b2cf246..4b649596de7 100644 --- a/systemtest/gencorpora/config.go +++ b/systemtest/gencorpora/config.go @@ -34,6 +34,7 @@ var gencorporaConfig = struct { CorporaPath string MetadataPath string LoggingLevel zapcore.Level + ReplayCount int }{ CorporaPath: filepath.Join(defaultDir, getCorporaPath(defaultFilePrefix)), MetadataPath: filepath.Join(defaultDir, getMetaPath(defaultFilePrefix)), @@ -55,6 +56,12 @@ func init() { return nil }, ) + flag.IntVar( + &gencorporaConfig.ReplayCount, + "replay-count", + 1, + "Number of times the events are replayed", + ) flag.Var( &gencorporaConfig.LoggingLevel, "logging-level", diff --git a/systemtest/gencorpora/gencorpora.go b/systemtest/gencorpora/gencorpora.go new file mode 100644 index 00000000000..94537dff678 --- /dev/null +++ b/systemtest/gencorpora/gencorpora.go @@ -0,0 +1,99 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package gencorpora + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/hashicorp/go-multierror" + "golang.org/x/sync/errgroup" + + "github.com/elastic/apm-server/systemtest/loadgen" +) + +// Run runs APM-Server and CatBulk server followed by sending +// load to APM-Server and writing the generated corpus to a file. +// Run exits on error or on successful corpora generation. +func Run(rootCtx context.Context) error { + // Create fake ES server + esServer, err := NewCatBulkServer() + if err != nil { + return err + } + + // Create APM-Server to send documents to fake ES + apmServer := NewAPMServer(rootCtx, esServer.Addr) + if err := apmServer.Start(); err != nil { + return err + } + + ctx, cancel := context.WithCancel(rootCtx) + g, gctx := errgroup.WithContext(ctx) + g.Go(func() error { + <-gctx.Done() + + var result error + if err := apmServer.Close(); err != nil { + result = multierror.Append(result, err) + } + if err := esServer.Stop(); err != nil { + result = multierror.Append(result, err) + } + + return result + }) + g.Go(esServer.Serve) + g.Go(func() error { + return StreamAPMServerLogs(ctx, apmServer) + }) + g.Go(func() error { + defer cancel() + + waitCtx, waitCancel := context.WithTimeout(gctx, 10*time.Second) + defer waitCancel() + if err := apmServer.WaitForPublishReady(waitCtx); err != nil { + return fmt.Errorf("failed while waiting for APM-Server to be ready: %w", err) + } + + return generateLoad(ctx, apmServer.URL, gencorporaConfig.ReplayCount) + }) + + if err := g.Wait(); !errors.Is(err, context.Canceled) { + return err + } + + return nil +} + +func generateLoad(ctx context.Context, serverURL string, replayCount int) error { + inf := loadgen.GetNewLimiter(0) + handler, err := loadgen.NewEventHandler(`*.ndjson`, serverURL, "", inf) + if err != nil { + return err + } + + for i := 0; i < replayCount; i++ { + if _, err = handler.SendBatches(ctx); err != nil { + return fmt.Errorf("failed sending batches on iteration %d: %w", i+1, err) + } + } + return nil +}