Skip to content

Commit

Permalink
chore: jobsdb benchmark cli tool
Browse files Browse the repository at this point in the history
  • Loading branch information
Sidddddarth committed Dec 12, 2024
1 parent f22e8f4 commit fe9d001
Show file tree
Hide file tree
Showing 5 changed files with 432 additions and 3 deletions.
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ require (
github.com/rudderlabs/sql-tunnels v0.1.7
github.com/rudderlabs/sqlconnect-go v1.13.0
github.com/samber/lo v1.47.0
github.com/schollz/progressbar/v3 v3.17.1
github.com/segmentio/go-hll v1.0.1
github.com/segmentio/kafka-go v0.4.47
github.com/segmentio/ksuid v1.0.4
Expand Down Expand Up @@ -118,6 +119,7 @@ require (
github.com/apache/arrow-go/v18 v18.0.0 // indirect
github.com/apache/arrow/go/v16 v16.0.0 // indirect
github.com/dgraph-io/ristretto/v2 v2.0.0 // indirect
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect
github.com/spf13/viper v1.19.0 // indirect
)

Expand Down Expand Up @@ -342,7 +344,7 @@ require (
golang.org/x/mod v0.21.0 // indirect
golang.org/x/net v0.32.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/term v0.25.0 // indirect
golang.org/x/term v0.26.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/time v0.8.0
golang.org/x/tools v0.26.0 // indirect
Expand Down
10 changes: 8 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,8 @@ github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chengxilo/virtualterm v1.0.4 h1:Z6IpERbRVlfB8WkOmtbHiDbBANU7cimRIof7mk9/PwM=
github.com/chengxilo/virtualterm v1.0.4/go.mod h1:DyxxBZz/x1iqJjFxTFcr6/x+jSpqN0iwWCOK1q10rlY=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
Expand Down Expand Up @@ -1010,6 +1012,8 @@ github.com/minio/minio-go/v7 v7.0.34/go.mod h1:nCrRzjoSUQh8hgKKtu3Y708OLvRLtuASM
github.com/minio/minio-go/v7 v7.0.81 h1:SzhMN0TQ6T/xSBu6Nvw3M5M8voM+Ht8RH3hE8S7zxaA=
github.com/minio/minio-go/v7 v7.0.81/go.mod h1:84gmIilaX4zcvAWWzJ5Z1WI5axN+hAbM5w25xf8xvC0=
github.com/minio/sha256-simd v1.0.0/go.mod h1:OuYzVNI5vcoYIAmbIvHPl3N3jUzVedXbKy5RFepssQM=
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db h1:62I3jR2EmQ4l5rM/4FEfDWcRD+abF5XlKShorW5LRoQ=
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db/go.mod h1:l0dey0ia/Uv7NcFFVbCLtqEBQbrT4OCwCSKTEv6enCw=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/mapstructure v1.3.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mitchellh/mapstructure v1.4.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
Expand Down Expand Up @@ -1193,6 +1197,8 @@ github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWR
github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc=
github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/schollz/progressbar/v3 v3.17.1 h1:bI1MTaoQO+v5kzklBjYNRQLoVpe0zbyRZNK6DFkVC5U=
github.com/schollz/progressbar/v3 v3.17.1/go.mod h1:RzqpnsPQNjUyIgdglUjRLgD7sVnxN1wpmBMV+UiEbL4=
github.com/scylladb/gocql v1.14.4 h1:MhevwCfyAraQ6RvZYFO3pF4Lt0YhvQlfg8Eo2HEqVQA=
github.com/scylladb/gocql v1.14.4/go.mod h1:ZLEJ0EVE5JhmtxIW2stgHq/v1P4fWap0qyyXSKyV8K0=
github.com/scylladb/termtables v0.0.0-20191203121021-c4c0b6d42ff4/go.mod h1:C1a7PQSMz9NShzorzCiG2fk9+xuCgLkPeCvMHYR2OWg=
Expand Down Expand Up @@ -1628,8 +1634,8 @@ golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2/go.mod h1:TeRTkGYfJXctD9OcfyVLyj2J3IxLnKwHJR8f4D8a3YE=
golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58=
golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY=
golang.org/x/term v0.25.0 h1:WtHI/ltw4NvSUig5KARz9h521QvRC8RmF/cuYqifU24=
golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M=
golang.org/x/term v0.26.0 h1:WEQa6V3Gja/BhNxg540hBip/kkaYtRg3cxg4oXSw4AU=
golang.org/x/term v0.26.0/go.mod h1:Si5m1o57C5nBNQo5z1iq+XDijt21BDBDp2bK0QI8e3E=
golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down
15 changes: 15 additions & 0 deletions jobsdb/cmd/bench/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# syntax=docker/dockerfile:1

# GO_VERSION is updated automatically to match go.mod, see Makefile
ARG GO_VERSION=1.23.3
ARG ALPINE_VERSION=3.20
FROM golang:${GO_VERSION}-alpine${ALPINE_VERSION} AS builder
RUN mkdir /app
WORKDIR /app
COPY . .
RUN go mod download && go build -o benchJobsdb ./jobsdb/cmd/bench/.

FROM alpine:${ALPINE_VERSION}
WORKDIR /root/
COPY --from=builder /app/benchJobsdb ./
CMD ['cat', '/dev/stdout']
279 changes: 279 additions & 0 deletions jobsdb/cmd/bench/cli.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,279 @@
package main

import (
"bytes"
"context"
"encoding/json"
"fmt"
"math/rand/v2"
"strconv"
"time"

"github.com/google/uuid"
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
kitsync "github.com/rudderlabs/rudder-go-kit/sync"
trand "github.com/rudderlabs/rudder-go-kit/testhelper/rand"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/schollz/progressbar/v3"
)

const prefix = "bench"

type Bencher struct {
log logger.Logger
conf *config.Config
}

func (bench *Bencher) RunBaseTest(ctx context.Context, b BaseTest) (err error) {
start := time.Now()
defer func() {
if r := recover(); r != nil {
bench.log.Error(r)
err = fmt.Errorf("internal error: %v", r)
}

Check warning on line 36 in jobsdb/cmd/bench/cli.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/cmd/bench/cli.go#L30-L36

Added lines #L30 - L36 were not covered by tests
}()
db, err := misc.NewDatabaseConnectionPool(context.Background(), bench.conf, stats.NOP, prefix)
if err != nil {
return fmt.Errorf("connect to db: %w", err)
}
if b.NumWriters == 0 {
b.NumWriters = bench.conf.GetInt("numWriters", 10)
}
if b.NumReaders == 0 {
b.NumReaders = bench.conf.GetInt("numReaders", 10)
}
if b.NumJobsPerTopic == 0 {
b.NumJobsPerTopic = bench.conf.GetInt("numJobsPerTopic", 10000)
}
if b.EventSize == 0 {
b.EventSize = bench.conf.GetInt("eventSize", 1536)
}
if b.EventsPickedUpPerQuery == 0 {
b.EventsPickedUpPerQuery = bench.conf.GetInt("eventsPickedUpPerQuery", 2000)
}
bench.log.Infon(
"base test config",
logger.NewIntField("numWriters", int64(b.NumWriters)),
logger.NewIntField("numReaders", int64(b.NumReaders)),
logger.NewIntField("numJobsPerTopic", int64(b.NumJobsPerTopic)),
logger.NewIntField("eventSize", int64(b.EventSize)),
logger.NewIntField("EventsPickedUpPerQuery", int64(b.EventsPickedUpPerQuery)),
logger.NewIntField("failure%", int64(b.FailurePercent)),
)

jd := jobsdb.NewForReadWrite(
prefix,
jobsdb.WithClearDB(b.ClearDB),
jobsdb.WithDBHandle(db),
jobsdb.WithSkipMaintenanceErr(config.GetBool("jobsDB.skipMaintenanceError", false)),
)
defer jd.TearDown()
if err := jd.Start(); err != nil {
return fmt.Errorf("start jobsdb: %w", err)
}
totalJobs := int(float64(b.NumJobsPerTopic)/float64(b.NumWriters)) * b.NumReaders * b.NumWriters
readProgressBar := progressbar.Default(int64(totalJobs))
g, ctx := kitsync.NewEagerGroup(ctx, 2)
writesDone := make(chan struct{})
g.Go(func() error {
defer close(writesDone)
writeGroup, ctx := kitsync.NewEagerGroup(ctx, b.NumWriters)
for i := 0; i < b.NumWriters; i++ {
writeGroup.Go(func() error {
start := time.Now()
bench.log.Debugw("started writer", "num", i)
defer func() {
bench.log.Debugn("closed writer",
logger.NewIntField("num", int64(i)),
logger.NewDurationField("duration", time.Since(start)),
)
}()
for j := 0; j < b.NumReaders; j++ {
jobs := createJobs(
"source_"+strconv.Itoa(j),
int(float64(b.NumJobsPerTopic)/float64(b.NumWriters)),
b.EventSize,
)
if err := jd.Store(ctx, jobs); err != nil {
return fmt.Errorf("storing jobs: %w", err)
}
bench.log.Debugn("jobs stored",
logger.NewIntField("num", int64(len(jobs))),
logger.NewStringField("reader", "source_"+strconv.Itoa(j)),
)

Check warning on line 106 in jobsdb/cmd/bench/cli.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/cmd/bench/cli.go#L38-L106

Added lines #L38 - L106 were not covered by tests

}
return nil

Check warning on line 109 in jobsdb/cmd/bench/cli.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/cmd/bench/cli.go#L109

Added line #L109 was not covered by tests
})
}
return writeGroup.Wait()

Check warning on line 112 in jobsdb/cmd/bench/cli.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/cmd/bench/cli.go#L112

Added line #L112 was not covered by tests
})
g.Go(func() error {
readGroup, ctx := kitsync.NewEagerGroup(ctx, b.NumReaders)
for i := 0; i < b.NumReaders; i++ {
readGroup.Go(func() error {
start := time.Now()
bench.log.Debugw("started reader", "num", i)
defer func() {
bench.log.Debugn(
"closed reader",
logger.NewIntField("num", int64(i)),
logger.NewDurationField("readerDuration", time.Since(start)),
)
}()
sourceID := "source_" + strconv.Itoa(i)
timer := time.NewTicker(10 * time.Millisecond)
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:

Check warning on line 133 in jobsdb/cmd/bench/cli.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/cmd/bench/cli.go#L114-L133

Added lines #L114 - L133 were not covered by tests
}
jobResult, err := jd.GetToProcess(
ctx,
jobsdb.GetQueryParams{
CustomValFilters: []string{"GW"},
ParameterFilters: []jobsdb.ParameterFilterT{
{Name: "source_id", Value: sourceID},
},
JobsLimit: b.EventsPickedUpPerQuery,
EventsLimit: b.EventsPickedUpPerQuery,
},
nil,
)
if err != nil {
return fmt.Errorf("getJobs - %s: %w", sourceID, err)
}
jobs := jobResult.Jobs
if len(jobs) == 0 {
select {
case <-writesDone:
return nil
default:
continue

Check warning on line 156 in jobsdb/cmd/bench/cli.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/cmd/bench/cli.go#L135-L156

Added lines #L135 - L156 were not covered by tests
}
}
// mark jobs as executing
if err := bench.markExecuting(ctx, jd, jobs); err != nil {
return fmt.Errorf("mark executing - %s: %w", sourceID, err)
}

Check warning on line 162 in jobsdb/cmd/bench/cli.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/cmd/bench/cli.go#L160-L162

Added lines #L160 - L162 were not covered by tests

// perform some work
// sleep somewhere between 10-100 ms
randTime := rand.IntN(90) + 10
time.Sleep(time.Duration(randTime) * time.Millisecond)

toFail := int(float64(b.FailurePercent*len(jobs)) / 100)
statusList := make([]*jobsdb.JobStatusT, 0, len(jobs))
fail := 0
for j := 0; j < len(jobs); j++ {
jobState := jobsdb.Succeeded.State
if j < toFail { // simply fail first `toFail` jobs
// only if first attempt
if jobs[j].LastJobStatus.AttemptNum == 0 {
jobState = jobsdb.Failed.State
fail++
}

Check warning on line 179 in jobsdb/cmd/bench/cli.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/cmd/bench/cli.go#L166-L179

Added lines #L166 - L179 were not covered by tests
}
statusList = append(statusList, &jobsdb.JobStatusT{
JobID: jobs[j].JobID,
AttemptNum: jobs[j].LastJobStatus.AttemptNum + 1,
JobState: jobState,
ExecTime: time.Now(),
RetryTime: time.Now(),
ErrorCode: "",
ErrorResponse: []byte(`{}`),
Parameters: []byte(`{}`),
JobParameters: jobs[j].Parameters,
WorkspaceId: jobs[j].WorkspaceId,
})

Check warning on line 192 in jobsdb/cmd/bench/cli.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/cmd/bench/cli.go#L181-L192

Added lines #L181 - L192 were not covered by tests
}
if err := jd.UpdateJobStatus(ctx, statusList, []string{"GW"}, []jobsdb.ParameterFilterT{
{Name: "source_id", Value: sourceID},
}); err != nil {
return fmt.Errorf("update status: %w", err)
}
readProgressBar.Add(len(jobs) - fail)

Check failure on line 199 in jobsdb/cmd/bench/cli.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `readProgressBar.Add` is not checked (errcheck)

Check warning on line 199 in jobsdb/cmd/bench/cli.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/cmd/bench/cli.go#L194-L199

Added lines #L194 - L199 were not covered by tests
}
})
}
return readGroup.Wait()

Check warning on line 203 in jobsdb/cmd/bench/cli.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/cmd/bench/cli.go#L203

Added line #L203 was not covered by tests
})

if err := g.Wait(); err != nil {
return err
}
bench.log.Infon(
"done",
logger.NewDurationField("duration", time.Since(start)),
)
return nil

Check warning on line 213 in jobsdb/cmd/bench/cli.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/cmd/bench/cli.go#L206-L213

Added lines #L206 - L213 were not covered by tests
}

type BaseTest struct {
ClearDB bool
NumWriters int
NumReaders int
NumJobsPerTopic int
EventSize int
FailurePercent int
EventsPickedUpPerQuery int
}

func createJobs(sourceID string, numEvents, eventSize int) []*jobsdb.JobT {
jobs := make([]*jobsdb.JobT, 0, numEvents)
samplePayload := &bytes.Buffer{}
samplePayload.WriteString(`{"field1": "`)
samplePayload.WriteString(trand.String(eventSize / 2))
samplePayload.WriteString(`", "field2": "`)
samplePayload.WriteString(trand.String(eventSize / 2))
samplePayload.WriteString(`"}`)
payload := samplePayload.Bytes()
for i := 0; i < numEvents; i++ {
jobs = append(jobs, &jobsdb.JobT{
UUID: uuid.New(),
UserID: trand.String(10),
CustomVal: "GW",
EventCount: 1,
EventPayload: payload,
Parameters: json.RawMessage(parameters(sourceID)),
WorkspaceId: sourceID,
})
}
return jobs

Check warning on line 246 in jobsdb/cmd/bench/cli.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/cmd/bench/cli.go#L226-L246

Added lines #L226 - L246 were not covered by tests
}

func parameters(sourceID string) string {
return fmt.Sprintf(`{"source_id": "%s"}`, sourceID)

Check warning on line 250 in jobsdb/cmd/bench/cli.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/cmd/bench/cli.go#L249-L250

Added lines #L249 - L250 were not covered by tests
}

func (bencher *Bencher) markExecuting(ctx context.Context, jd *jobsdb.Handle, jobs []*jobsdb.JobT) error {
start := time.Now()
statusList := make([]*jobsdb.JobStatusT, len(jobs))
for i, job := range jobs {
statusList[i] = &jobsdb.JobStatusT{
JobID: job.JobID,
AttemptNum: job.LastJobStatus.AttemptNum,
JobState: jobsdb.Executing.State,
ExecTime: time.Now(),
RetryTime: time.Now(),
ErrorCode: "",
ErrorResponse: []byte(`{}`),
Parameters: []byte(`{}`),
JobParameters: job.Parameters,
WorkspaceId: job.WorkspaceId,
}
}
if err := jd.UpdateJobStatus(ctx, statusList, []string{"GW"}, nil); err != nil {
return err
}
bencher.log.Debugn(
"marked executed",
logger.NewIntField("num", int64(len(statusList))),
logger.NewDurationField("duration", time.Since(start)),
)
return nil

Check warning on line 278 in jobsdb/cmd/bench/cli.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/cmd/bench/cli.go#L253-L278

Added lines #L253 - L278 were not covered by tests
}
Loading

0 comments on commit fe9d001

Please sign in to comment.