Skip to content

Commit

Permalink
Add gencorpora cmd to generate ES corpora for rally (#8878)
Browse files Browse the repository at this point in the history
  • Loading branch information
lahsivjar authored Aug 18, 2022
1 parent 13bf70a commit 8f4f7b0
Show file tree
Hide file tree
Showing 13 changed files with 489 additions and 18 deletions.
5 changes: 5 additions & 0 deletions systemtest/apmservertest/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ func (c *ServerCmd) Wait() error {
return c.Cmd.Wait()
}

// InterruptProcess sends an interrupt signal
func (c *ServerCmd) InterruptProcess() error {
return interruptProcess(c.Process)
}

func (c *ServerCmd) prestart() error {
if c.buildError != nil {
return c.buildError
Expand Down
2 changes: 1 addition & 1 deletion systemtest/apmservertest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ func (s *Server) consumeStderr(procStderr io.Reader) {
// the server.
func (s *Server) Close() error {
if s.cmd != nil {
if err := interruptProcess(s.cmd.Process); err != nil {
if err := s.cmd.InterruptProcess(); err != nil {
s.cmd.Process.Kill()
}
}
Expand Down
11 changes: 6 additions & 5 deletions systemtest/benchtest/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"google.golang.org/grpc/credentials"

"github.com/elastic/apm-server/systemtest/loadgen"
loadgencfg "github.com/elastic/apm-server/systemtest/loadgen/config"
"github.com/elastic/apm-server/systemtest/loadgen/eventhandler"

"go.elastic.co/apm/v2"
Expand All @@ -40,8 +41,8 @@ import (
// to send to the target APM Server.
func NewTracer(tb testing.TB) *apm.Tracer {
httpTransport, err := transport.NewHTTPTransport(transport.HTTPTransportOptions{
ServerURLs: []*url.URL{loadgen.Config.ServerURL},
SecretToken: loadgen.Config.SecretToken,
ServerURLs: []*url.URL{loadgencfg.Config.ServerURL},
SecretToken: loadgencfg.Config.SecretToken,
})
if err != nil {
tb.Fatal(err)
Expand All @@ -59,8 +60,8 @@ func NewTracer(tb testing.TB) *apm.Tracer {
// NewOTLPExporter returns a new OpenTelemetry Go exporter, configured
// to export to the target APM Server.
func NewOTLPExporter(tb testing.TB) *otlptrace.Exporter {
serverURL := loadgen.Config.ServerURL
secretToken := loadgen.Config.SecretToken
serverURL := loadgencfg.Config.ServerURL
secretToken := loadgencfg.Config.SecretToken
endpoint := serverURL.Host
if serverURL.Port() == "" {
switch serverURL.Scheme {
Expand Down Expand Up @@ -98,7 +99,7 @@ func NewOTLPExporter(tb testing.TB) *otlptrace.Exporter {
// NewEventHandler creates a eventhandler which loads the files matching the
// passed regex.
func NewEventHandler(tb testing.TB, p string, l *rate.Limiter) *eventhandler.Handler {
serverCfg := loadgen.Config
serverCfg := loadgencfg.Config
h, err := loadgen.NewEventHandler(p, serverCfg.ServerURL.String(), serverCfg.SecretToken, l)
if err != nil {
tb.Fatal(err)
Expand Down
13 changes: 7 additions & 6 deletions systemtest/benchtest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (

"github.com/elastic/apm-server/systemtest/benchtest/expvar"
"github.com/elastic/apm-server/systemtest/loadgen"
loadgencfg "github.com/elastic/apm-server/systemtest/loadgen/config"
)

const waitInactiveTimeout = 30 * time.Second
Expand All @@ -62,15 +63,15 @@ func runBenchmark(f BenchmarkFunc) (testing.BenchmarkResult, bool, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var err error
server := loadgen.Config.ServerURL.String()
server := loadgencfg.Config.ServerURL.String()
collector, err = expvar.StartNewCollector(ctx, server, 100*time.Millisecond)
if err != nil {
b.Error(err)
ok = !b.Failed()
return
}

limiter := loadgen.GetNewLimiter(loadgen.Config.MaxEPM)
limiter := loadgen.GetNewLimiter(loadgencfg.Config.MaxEPM)
b.ResetTimer()
f(b, limiter)
if !b.Failed() {
Expand Down Expand Up @@ -149,7 +150,7 @@ func Run(allBenchmarks ...BenchmarkFunc) error {
}
// Sets the http.DefaultClient.Transport.TLSClientConfig.InsecureSkipVerify
// to match the "-secure" flag value.
verifyTLS := loadgen.Config.Secure
verifyTLS := loadgencfg.Config.Secure
http.DefaultClient.Transport = &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: !verifyTLS},
}
Expand Down Expand Up @@ -196,8 +197,8 @@ func Run(allBenchmarks ...BenchmarkFunc) error {
// value in the list will be used.
if len(agentsList) > 0 && benchConfig.WarmupTime.Seconds() > 0 {
agents := agentsList[0]
serverURL := loadgen.Config.ServerURL.String()
secretToken := loadgen.Config.SecretToken
serverURL := loadgencfg.Config.ServerURL.String()
secretToken := loadgencfg.Config.SecretToken
if err := warmup(agents, benchConfig.WarmupTime, serverURL, secretToken); err != nil {
return fmt.Errorf("warm-up failed with %d agents: %v", agents, err)
}
Expand Down Expand Up @@ -234,7 +235,7 @@ func warmup(agents int, duration time.Duration, url, token string) error {
ctx, cancel := context.WithTimeout(context.Background(), duration)
defer cancel()

rl := loadgen.GetNewLimiter(loadgen.Config.MaxEPM)
rl := loadgen.GetNewLimiter(loadgencfg.Config.MaxEPM)
var wg sync.WaitGroup
for i := 0; i < agents; i++ {
h, err := loadgen.NewEventHandler(`*.ndjson`, url, token, rl)
Expand Down
4 changes: 2 additions & 2 deletions systemtest/benchtest/profiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ import (

"github.com/google/pprof/profile"

"github.com/elastic/apm-server/systemtest/loadgen"
loadgencfg "github.com/elastic/apm-server/systemtest/loadgen/config"
)

func fetchProfile(urlPath string, duration time.Duration) (*profile.Profile, error) {
serverURL := loadgen.Config.ServerURL.String()
serverURL := loadgencfg.Config.ServerURL.String()
req, err := http.NewRequest("GET", serverURL+urlPath, nil)
if err != nil {
return nil, err
Expand Down
106 changes: 106 additions & 0 deletions systemtest/cmd/gencorpora/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package main

import (
"context"
"errors"
"flag"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"

"github.com/hashicorp/go-multierror"
"golang.org/x/sync/errgroup"

"github.com/elastic/apm-server/systemtest/gencorpora"
"github.com/elastic/apm-server/systemtest/loadgen"
)

const apmHost = "127.0.0.1:8200"

func run(rootCtx context.Context) error {
// Create fake ES server
esServer, err := gencorpora.NewCatBulkServer()
if err != nil {
return err
}

// Create APM-Server to send documents to fake ES
apmServer := gencorpora.NewAPMServer(rootCtx, esServer.Addr, apmHost)
apmServer.StreamLogs(rootCtx)

ctx, cancel := context.WithCancel(rootCtx)
g, gctx := errgroup.WithContext(ctx)
g.Go(func() error {
<-gctx.Done()

var result error
if err := apmServer.Stop(); err != nil {
result = multierror.Append(result, err)
}
if err := esServer.Stop(); err != nil {
result = multierror.Append(result, err)
}

return result
})
g.Go(esServer.Serve)
g.Go(apmServer.Start)
g.Go(func() error {
defer cancel()

waitCtx, waitCancel := context.WithTimeout(gctx, 10*time.Second)
defer waitCancel()
if err := apmServer.WaitForPublishReady(waitCtx); err != nil {
return fmt.Errorf("failed while waiting for APM-Server to be ready with err %v", err)
}

return generateLoad(ctx, "http://127.0.0.1:8200")
})

if err := g.Wait(); !errors.Is(err, context.Canceled) {
return err
}

return nil
}

func generateLoad(ctx context.Context, serverURL string) error {
inf := loadgen.GetNewLimiter(0)
handler, err := loadgen.NewEventHandler(`*.ndjson`, serverURL, "", inf)
if err != nil {
return err
}

_, err = handler.SendBatches(ctx)
return err
}

func main() {
flag.Parse()

ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer cancel()
if err := run(ctx); err != nil {
log.Fatal(err)
}
}
Loading

0 comments on commit 8f4f7b0

Please sign in to comment.