Skip to content

Commit

Permalink
Allow gencorpora to replay events (#8952)
Browse files Browse the repository at this point in the history
  • Loading branch information
lahsivjar authored Aug 25, 2022
1 parent 6c9e936 commit f978866
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 72 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ RALLY_EXTRA_FLAGS?=
RALLY_CLIENT_OPTIONS?=basic_auth_user:'admin',basic_auth_password:'changeme'
RALLY_FLAGS?=--pipeline=benchmark-only --client-options="$(RALLY_CLIENT_OPTIONS)" $(RALLY_EXTRA_FLAGS)
RALLY_BULK_SIZE?=5000
RALLY_GENCORPORA_REPLAY_COUNT?=1

.PHONY: rally
rally: $(PYTHON_BIN)/esrally rally/corpora
Expand All @@ -295,7 +296,7 @@ $(PYTHON_BIN)/esrally: $(PYTHON_BIN)
.PHONY: rally/corpora
rally/corpora:
@rm -fr rally/corpora && mkdir rally/corpora
@cd systemtest/cmd/gencorpora && $(GO) run . -write-dir $(CURRENT_DIR)/rally/corpora/
@cd systemtest/cmd/gencorpora && $(GO) run . -write-dir $(CURRENT_DIR)/rally/corpora/ -replay-count $(RALLY_GENCORPORA_REPLAY_COUNT)

##############################################################################
# Smoke tests -- Basic smoke tests for APM Server.
Expand Down
71 changes: 1 addition & 70 deletions systemtest/cmd/gencorpora/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,90 +19,21 @@ package main

import (
"context"
"errors"
"flag"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"

"github.com/hashicorp/go-multierror"
"golang.org/x/sync/errgroup"

"github.com/elastic/apm-server/systemtest/gencorpora"
"github.com/elastic/apm-server/systemtest/loadgen"
)

func run(rootCtx context.Context) error {
// Create fake ES server
esServer, err := gencorpora.NewCatBulkServer()
if err != nil {
return err
}

// Create APM-Server to send documents to fake ES
apmServer := gencorpora.NewAPMServer(rootCtx, esServer.Addr)
if err := apmServer.Start(); err != nil {
return err
}

ctx, cancel := context.WithCancel(rootCtx)
g, gctx := errgroup.WithContext(ctx)
g.Go(func() error {
<-gctx.Done()

var result error
if err := apmServer.Close(); err != nil {
result = multierror.Append(result, err)
}
if err := esServer.Stop(); err != nil {
result = multierror.Append(result, err)
}

return result
})
g.Go(esServer.Serve)
g.Go(func() error {
return gencorpora.StreamAPMServerLogs(ctx, apmServer)
})
g.Go(func() error {
defer cancel()

waitCtx, waitCancel := context.WithTimeout(gctx, 10*time.Second)
defer waitCancel()
if err := apmServer.WaitForPublishReady(waitCtx); err != nil {
return fmt.Errorf("failed while waiting for APM-Server to be ready with err %v", err)
}

return generateLoad(ctx, apmServer.URL)
})

if err := g.Wait(); !errors.Is(err, context.Canceled) {
return err
}

return nil
}

func generateLoad(ctx context.Context, serverURL string) error {
inf := loadgen.GetNewLimiter(0)
handler, err := loadgen.NewEventHandler(`*.ndjson`, serverURL, "", inf)
if err != nil {
return err
}

_, err = handler.SendBatches(ctx)
return err
}

func main() {
flag.Parse()

ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer cancel()
if err := run(ctx); err != nil {
if err := gencorpora.Run(ctx); err != nil {
log.Fatal(err)
}
}
2 changes: 1 addition & 1 deletion systemtest/gencorpora/catbulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (s *CatBulkServer) Stop() error {
defer s.writer.Close()

if err := s.server.Shutdown(ctx); err != nil {
return fmt.Errorf("failed to shutdown cat bulk server with error %v, no metadata written", err)
return fmt.Errorf("failed to shutdown cat bulk server no metadata written: %w", err)
}

close(s.metaUpdateChan)
Expand Down
7 changes: 7 additions & 0 deletions systemtest/gencorpora/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ var gencorporaConfig = struct {
CorporaPath string
MetadataPath string
LoggingLevel zapcore.Level
ReplayCount int
}{
CorporaPath: filepath.Join(defaultDir, getCorporaPath(defaultFilePrefix)),
MetadataPath: filepath.Join(defaultDir, getMetaPath(defaultFilePrefix)),
Expand All @@ -55,6 +56,12 @@ func init() {
return nil
},
)
flag.IntVar(
&gencorporaConfig.ReplayCount,
"replay-count",
1,
"Number of times the events are replayed",
)
flag.Var(
&gencorporaConfig.LoggingLevel,
"logging-level",
Expand Down
99 changes: 99 additions & 0 deletions systemtest/gencorpora/gencorpora.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package gencorpora

import (
"context"
"errors"
"fmt"
"time"

"github.com/hashicorp/go-multierror"
"golang.org/x/sync/errgroup"

"github.com/elastic/apm-server/systemtest/loadgen"
)

// Run runs APM-Server and CatBulk server followed by sending
// load to APM-Server and writing the generated corpus to a file.
// Run exits on error or on successful corpora generation.
func Run(rootCtx context.Context) error {
// Create fake ES server
esServer, err := NewCatBulkServer()
if err != nil {
return err
}

// Create APM-Server to send documents to fake ES
apmServer := NewAPMServer(rootCtx, esServer.Addr)
if err := apmServer.Start(); err != nil {
return err
}

ctx, cancel := context.WithCancel(rootCtx)
g, gctx := errgroup.WithContext(ctx)
g.Go(func() error {
<-gctx.Done()

var result error
if err := apmServer.Close(); err != nil {
result = multierror.Append(result, err)
}
if err := esServer.Stop(); err != nil {
result = multierror.Append(result, err)
}

return result
})
g.Go(esServer.Serve)
g.Go(func() error {
return StreamAPMServerLogs(ctx, apmServer)
})
g.Go(func() error {
defer cancel()

waitCtx, waitCancel := context.WithTimeout(gctx, 10*time.Second)
defer waitCancel()
if err := apmServer.WaitForPublishReady(waitCtx); err != nil {
return fmt.Errorf("failed while waiting for APM-Server to be ready: %w", err)
}

return generateLoad(ctx, apmServer.URL, gencorporaConfig.ReplayCount)
})

if err := g.Wait(); !errors.Is(err, context.Canceled) {
return err
}

return nil
}

func generateLoad(ctx context.Context, serverURL string, replayCount int) error {
inf := loadgen.GetNewLimiter(0)
handler, err := loadgen.NewEventHandler(`*.ndjson`, serverURL, "", inf)
if err != nil {
return err
}

for i := 0; i < replayCount; i++ {
if _, err = handler.SendBatches(ctx); err != nil {
return fmt.Errorf("failed sending batches on iteration %d: %w", i+1, err)
}
}
return nil
}

0 comments on commit f978866

Please sign in to comment.