Skip to content

Commit

Permalink
Add gencorpora cmd to generate ES corpora for rally
Browse files Browse the repository at this point in the history
  • Loading branch information
lahsivjar committed Aug 16, 2022
1 parent cf796e2 commit b6a8aac
Show file tree
Hide file tree
Showing 10 changed files with 258 additions and 17 deletions.
11 changes: 6 additions & 5 deletions systemtest/benchtest/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"google.golang.org/grpc/credentials"

"github.com/elastic/apm-server/systemtest/loadgen"
loadgencfg "github.com/elastic/apm-server/systemtest/loadgen/config"
"github.com/elastic/apm-server/systemtest/loadgen/eventhandler"

"go.elastic.co/apm/v2"
Expand All @@ -40,8 +41,8 @@ import (
// to send to the target APM Server.
func NewTracer(tb testing.TB) *apm.Tracer {
httpTransport, err := transport.NewHTTPTransport(transport.HTTPTransportOptions{
ServerURLs: []*url.URL{loadgen.Config.ServerURL},
SecretToken: loadgen.Config.SecretToken,
ServerURLs: []*url.URL{loadgencfg.Config.ServerURL},
SecretToken: loadgencfg.Config.SecretToken,
})
if err != nil {
tb.Fatal(err)
Expand All @@ -59,8 +60,8 @@ func NewTracer(tb testing.TB) *apm.Tracer {
// NewOTLPExporter returns a new OpenTelemetry Go exporter, configured
// to export to the target APM Server.
func NewOTLPExporter(tb testing.TB) *otlptrace.Exporter {
serverURL := loadgen.Config.ServerURL
secretToken := loadgen.Config.SecretToken
serverURL := loadgencfg.Config.ServerURL
secretToken := loadgencfg.Config.SecretToken
endpoint := serverURL.Host
if serverURL.Port() == "" {
switch serverURL.Scheme {
Expand Down Expand Up @@ -98,7 +99,7 @@ func NewOTLPExporter(tb testing.TB) *otlptrace.Exporter {
// NewEventHandler creates a eventhandler which loads the files matching the
// passed regex.
func NewEventHandler(tb testing.TB, p string, l *rate.Limiter) *eventhandler.Handler {
serverCfg := loadgen.Config
serverCfg := loadgencfg.Config
h, err := loadgen.NewEventHandler(p, serverCfg.ServerURL.String(), serverCfg.SecretToken, l)
if err != nil {
tb.Fatal(err)
Expand Down
13 changes: 7 additions & 6 deletions systemtest/benchtest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (

"github.com/elastic/apm-server/systemtest/benchtest/expvar"
"github.com/elastic/apm-server/systemtest/loadgen"
loadgencfg "github.com/elastic/apm-server/systemtest/loadgen/config"
)

const waitInactiveTimeout = 30 * time.Second
Expand All @@ -62,15 +63,15 @@ func runBenchmark(f BenchmarkFunc) (testing.BenchmarkResult, bool, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var err error
server := loadgen.Config.ServerURL.String()
server := loadgencfg.Config.ServerURL.String()
collector, err = expvar.StartNewCollector(ctx, server, 100*time.Millisecond)
if err != nil {
b.Error(err)
ok = !b.Failed()
return
}

limiter := loadgen.GetNewLimiter(loadgen.Config.MaxEPM)
limiter := loadgen.GetNewLimiter(loadgencfg.Config.MaxEPM)
b.ResetTimer()
f(b, limiter)
if !b.Failed() {
Expand Down Expand Up @@ -149,7 +150,7 @@ func Run(allBenchmarks ...BenchmarkFunc) error {
}
// Sets the http.DefaultClient.Transport.TLSClientConfig.InsecureSkipVerify
// to match the "-secure" flag value.
verifyTLS := loadgen.Config.Secure
verifyTLS := loadgencfg.Config.Secure
http.DefaultClient.Transport = &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: !verifyTLS},
}
Expand Down Expand Up @@ -196,8 +197,8 @@ func Run(allBenchmarks ...BenchmarkFunc) error {
// value in the list will be used.
if len(agentsList) > 0 && benchConfig.WarmupTime.Seconds() > 0 {
agents := agentsList[0]
serverURL := loadgen.Config.ServerURL.String()
secretToken := loadgen.Config.SecretToken
serverURL := loadgencfg.Config.ServerURL.String()
secretToken := loadgencfg.Config.SecretToken
if err := warmup(agents, benchConfig.WarmupTime, serverURL, secretToken); err != nil {
return fmt.Errorf("warm-up failed with %d agents: %v", agents, err)
}
Expand Down Expand Up @@ -234,7 +235,7 @@ func warmup(agents int, duration time.Duration, url, token string) error {
ctx, cancel := context.WithTimeout(context.Background(), duration)
defer cancel()

rl := loadgen.GetNewLimiter(loadgen.Config.MaxEPM)
rl := loadgen.GetNewLimiter(loadgencfg.Config.MaxEPM)
var wg sync.WaitGroup
for i := 0; i < agents; i++ {
h, err := loadgen.NewEventHandler(`*.ndjson`, url, token, rl)
Expand Down
4 changes: 2 additions & 2 deletions systemtest/benchtest/profiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ import (

"github.com/google/pprof/profile"

"github.com/elastic/apm-server/systemtest/loadgen"
loadgencfg "github.com/elastic/apm-server/systemtest/loadgen/config"
)

func fetchProfile(urlPath string, duration time.Duration) (*profile.Profile, error) {
serverURL := loadgen.Config.ServerURL.String()
serverURL := loadgencfg.Config.ServerURL.String()
req, err := http.NewRequest("GET", serverURL+urlPath, nil)
if err != nil {
return nil, err
Expand Down
110 changes: 110 additions & 0 deletions systemtest/cmd/gencorpora/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package main

import (
"context"
"flag"
"fmt"
"io"
"log"
"os"
"path/filepath"
"time"

"github.com/elastic/apm-server/systemtest/gencorpora"
"github.com/elastic/apm-server/systemtest/loadgen"
)

func main() {
corporaDir := flag.String("corpora-dir", "", "output directory for corpora ndjson files")
flag.Parse()

f, err := getWriter(*corporaDir)
if err != nil {
log.Fatal(err)
}
defer f.Close()

// Create fake ES server
esServer, err := gencorpora.GetCatBulkServer(f)
if err != nil {
log.Fatal(err)
}
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
if err := esServer.Shutdown(ctx); err != nil {
log.Fatal("failed to shutdown cat bulk server", err)
}
cancel()
}()

// Create APM-Server to send documents to fake ES
apmServer, err := gencorpora.GetAPMServer(esServer.Addr)
if err != nil {
log.Fatal(err)
}
defer func() {
if err := apmServer.Cmd.Process.Signal(os.Interrupt); err != nil {
apmServer.Cmd.Process.Kill()
log.Fatal("failed to interrupt process, initiated kill", err)
}
if err := apmServer.Wait(); err != nil {
log.Fatal("failed while waiting for APM-Server to exit", err)
}
}()

// Start fake ES and APM-Server
go func() {
err := esServer.Serve()
if err != nil {
log.Fatal(err)
}
}()
if err := apmServer.Start(); err != nil {
log.Fatal(err)
}

// Wait for APM Server to be ready to accept requests
<-time.After(2 * time.Second)

// Send data to APM-Server
if err := generateLoad(context.Background(), "http://127.0.0.1:8200"); err != nil {
log.Fatal("failed to generate load", err)
}
}

func getWriter(corporaDir string) (io.WriteCloser, error) {
if corporaDir == "" {
return os.Stdout, nil
}
return os.Create(
filepath.Join(corporaDir, fmt.Sprintf("%s.ndjson", time.Now().Format(time.RFC3339))),
)
}

func generateLoad(ctx context.Context, serverURL string) error {
inf := loadgen.GetNewLimiter(0)
handler, err := loadgen.NewEventHandler(`*.ndjson`, serverURL, "", inf)
if err != nil {
return err
}

_, err = handler.SendBatches(ctx)
return err
}
38 changes: 38 additions & 0 deletions systemtest/gencorpora/apmserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package gencorpora

import (
"bufio"
"context"
"fmt"
"log"

"github.com/elastic/apm-server/systemtest/apmservertest"
)

func GetAPMServer(esHost string) (*apmservertest.ServerCmd, error) {
args := []string{
"--strict.perms=false",
"-E", "logging.level=warning",
"-E", "logging.to_stderr=true",
"-E", "apm-server.data_streams.wait_for_integration=false",
"-E", "apm-server.host=127.0.0.1:8200",
"-E", fmt.Sprintf("output.elasticsearch.hosts=['%s']", esHost),
}

cmd := apmservertest.ServerCommand(context.Background(), "run", args...)
reader, err := cmd.StderrPipe()
if err != nil {
return nil, err
}

// log streaming
go func() {
scanner := bufio.NewScanner(reader)

for scanner.Scan() {
log.Println(scanner.Text())
}
}()

return cmd, nil
}
86 changes: 86 additions & 0 deletions systemtest/gencorpora/catbulk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package gencorpora

import (
"bufio"
"compress/gzip"
"encoding/json"
"fmt"
"io"
"log"
"net"
"net/http"
)

// CatBulkServer wraps http server and a listener to listen
// for ES requests on any available port
type CatBulkServer struct {
*http.Server
listener net.Listener
}

// GetCatBulkServer returns a HTTP Server which can serve as a
// fake ES server writing the response of the bulk request to the
// provided writer. Writes to the provided writer must be thread safe.
func GetCatBulkServer(writer io.Writer) (*CatBulkServer, error) {
listener, err := net.Listen("tcp", ":0")
if err != nil {
return nil, err
}

return &CatBulkServer{
listener: listener,
Server: &http.Server{
Addr: listener.Addr().String(),
Handler: handleReq(writer),
},
}, nil
}

// Serve accepts incoming requests to the fake ES server on a listener
func (s *CatBulkServer) Serve() error {
return s.Server.Serve(s.listener)
}

func handleReq(writer io.Writer) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
w.Header().Set("X-Elastic-Product", "Elasticsearch")
switch req.Method {
case http.MethodGet:
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"cluster_uuid": "cat_bulk"}`))
case http.MethodPost:
reader := req.Body
defer req.Body.Close()

if encoding := req.Header.Get("Content-Encoding"); encoding == "gzip" {
var err error
reader, err = gzip.NewReader(reader)
if err != nil {
log.Println("failed to read request body", err)
w.WriteHeader(http.StatusBadRequest)
return
}
}

respItems := make(map[string]map[string]int)
scanner := bufio.NewScanner(reader)

for count := 0; scanner.Scan(); count++ {
fmt.Fprintln(writer, scanner.Text())
if count%2 == 0 {
respItems["action"] = map[string]int{"status": 200}
}
}

resp, err := json.Marshal(respItems)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
w.Write(resp)
default:
w.WriteHeader(http.StatusNotFound)
}
})
}
2 changes: 2 additions & 0 deletions systemtest/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/elastic/go-elasticsearch/v8 v8.0.0-20211026132249-f55eeca23be5
github.com/gofrs/uuid v4.1.0+incompatible
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38
github.com/hashicorp/go-multierror v1.1.1
github.com/jaegertracing/jaeger v1.18.1
github.com/mitchellh/mapstructure v1.1.2
github.com/spf13/cobra v1.1.3
Expand Down Expand Up @@ -43,6 +44,7 @@ require (
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.1 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.8.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions systemtest/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -487,13 +487,15 @@ github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMW
github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q=
github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-hclog v0.0.0-20180709165350-ff2cf002a8dd/go.mod h1:9bjs9uLqI8l75knNv3lV1kA55veR+WUPSiKIWcQHudI=
github.com/hashicorp/go-hclog v0.14.0/go.mod h1:whpDNt7SSdeAju8AWKIWsul05p54N/39EeqMAyrmvFQ=
github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hashicorp/go-plugin v1.3.0/go.mod h1:F9eH4LrE/ZsRdbwhfjs9k9HoDUwAHnYtXdgmf1AVNs0=
github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package loadgen
package loadgencfg

import (
"flag"
Expand Down
Loading

0 comments on commit b6a8aac

Please sign in to comment.