diff --git a/go.mod b/go.mod index 16c23472a7..492522d5a4 100644 --- a/go.mod +++ b/go.mod @@ -83,6 +83,7 @@ require ( github.com/rudderlabs/sql-tunnels v0.1.7 github.com/rudderlabs/sqlconnect-go v1.13.0 github.com/samber/lo v1.47.0 + github.com/schollz/progressbar/v3 v3.17.1 github.com/segmentio/go-hll v1.0.1 github.com/segmentio/kafka-go v0.4.47 github.com/segmentio/ksuid v1.0.4 @@ -118,6 +119,7 @@ require ( github.com/apache/arrow-go/v18 v18.0.0 // indirect github.com/apache/arrow/go/v16 v16.0.0 // indirect github.com/dgraph-io/ristretto/v2 v2.0.0 // indirect + github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect github.com/spf13/viper v1.19.0 // indirect ) @@ -342,7 +344,7 @@ require ( golang.org/x/mod v0.21.0 // indirect golang.org/x/net v0.32.0 // indirect golang.org/x/sys v0.28.0 // indirect - golang.org/x/term v0.25.0 // indirect + golang.org/x/term v0.26.0 // indirect golang.org/x/text v0.21.0 // indirect golang.org/x/time v0.8.0 golang.org/x/tools v0.26.0 // indirect diff --git a/go.sum b/go.sum index 822fff31a7..fb42c458d6 100644 --- a/go.sum +++ b/go.sum @@ -374,6 +374,8 @@ github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/chengxilo/virtualterm v1.0.4 h1:Z6IpERbRVlfB8WkOmtbHiDbBANU7cimRIof7mk9/PwM= +github.com/chengxilo/virtualterm v1.0.4/go.mod h1:DyxxBZz/x1iqJjFxTFcr6/x+jSpqN0iwWCOK1q10rlY= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= @@ -1010,6 +1012,8 @@ github.com/minio/minio-go/v7 v7.0.34/go.mod h1:nCrRzjoSUQh8hgKKtu3Y708OLvRLtuASM github.com/minio/minio-go/v7 v7.0.81 h1:SzhMN0TQ6T/xSBu6Nvw3M5M8voM+Ht8RH3hE8S7zxaA= github.com/minio/minio-go/v7 v7.0.81/go.mod h1:84gmIilaX4zcvAWWzJ5Z1WI5axN+hAbM5w25xf8xvC0= github.com/minio/sha256-simd v1.0.0/go.mod h1:OuYzVNI5vcoYIAmbIvHPl3N3jUzVedXbKy5RFepssQM= +github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db h1:62I3jR2EmQ4l5rM/4FEfDWcRD+abF5XlKShorW5LRoQ= +github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db/go.mod h1:l0dey0ia/Uv7NcFFVbCLtqEBQbrT4OCwCSKTEv6enCw= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/mapstructure v1.3.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/mitchellh/mapstructure v1.4.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= @@ -1193,6 +1197,8 @@ github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWR github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc= github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= +github.com/schollz/progressbar/v3 v3.17.1 h1:bI1MTaoQO+v5kzklBjYNRQLoVpe0zbyRZNK6DFkVC5U= +github.com/schollz/progressbar/v3 v3.17.1/go.mod h1:RzqpnsPQNjUyIgdglUjRLgD7sVnxN1wpmBMV+UiEbL4= github.com/scylladb/gocql v1.14.4 h1:MhevwCfyAraQ6RvZYFO3pF4Lt0YhvQlfg8Eo2HEqVQA= github.com/scylladb/gocql v1.14.4/go.mod h1:ZLEJ0EVE5JhmtxIW2stgHq/v1P4fWap0qyyXSKyV8K0= github.com/scylladb/termtables v0.0.0-20191203121021-c4c0b6d42ff4/go.mod h1:C1a7PQSMz9NShzorzCiG2fk9+xuCgLkPeCvMHYR2OWg= @@ -1628,8 +1634,8 @@ golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2/go.mod h1:TeRTkGYfJXctD9OcfyVLyj2J3IxLnKwHJR8f4D8a3YE= golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58= golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY= -golang.org/x/term v0.25.0 h1:WtHI/ltw4NvSUig5KARz9h521QvRC8RmF/cuYqifU24= -golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M= +golang.org/x/term v0.26.0 h1:WEQa6V3Gja/BhNxg540hBip/kkaYtRg3cxg4oXSw4AU= +golang.org/x/term v0.26.0/go.mod h1:Si5m1o57C5nBNQo5z1iq+XDijt21BDBDp2bK0QI8e3E= golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/jobsdb/cmd/bench/Dockerfile b/jobsdb/cmd/bench/Dockerfile new file mode 100644 index 0000000000..f93f8e14e9 --- /dev/null +++ b/jobsdb/cmd/bench/Dockerfile @@ -0,0 +1,15 @@ +# syntax=docker/dockerfile:1 + +# GO_VERSION is updated automatically to match go.mod, see Makefile +ARG GO_VERSION=1.23.3 +ARG ALPINE_VERSION=3.20 +FROM golang:${GO_VERSION}-alpine${ALPINE_VERSION} AS builder +RUN mkdir /app +WORKDIR /app +COPY . . +RUN go mod download && go build -o benchJobsdb ./jobsdb/cmd/bench/. + +FROM alpine:${ALPINE_VERSION} +WORKDIR /root/ +COPY --from=builder /app/benchJobsdb ./ +CMD ['cat', '/dev/stdout'] \ No newline at end of file diff --git a/jobsdb/cmd/bench/cli.go b/jobsdb/cmd/bench/cli.go new file mode 100644 index 0000000000..78f13a28b0 --- /dev/null +++ b/jobsdb/cmd/bench/cli.go @@ -0,0 +1,279 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "math/rand/v2" + "strconv" + "time" + + "github.com/google/uuid" + "github.com/rudderlabs/rudder-go-kit/config" + "github.com/rudderlabs/rudder-go-kit/logger" + "github.com/rudderlabs/rudder-go-kit/stats" + kitsync "github.com/rudderlabs/rudder-go-kit/sync" + trand "github.com/rudderlabs/rudder-go-kit/testhelper/rand" + "github.com/rudderlabs/rudder-server/jobsdb" + "github.com/rudderlabs/rudder-server/utils/misc" + "github.com/schollz/progressbar/v3" +) + +const prefix = "bench" + +type Bencher struct { + log logger.Logger + conf *config.Config +} + +func (bench *Bencher) RunBaseTest(ctx context.Context, b BaseTest) (err error) { + start := time.Now() + defer func() { + if r := recover(); r != nil { + bench.log.Error(r) + err = fmt.Errorf("internal error: %v", r) + } + }() + db, err := misc.NewDatabaseConnectionPool(context.Background(), bench.conf, stats.NOP, prefix) + if err != nil { + return fmt.Errorf("connect to db: %w", err) + } + if b.NumWriters == 0 { + b.NumWriters = bench.conf.GetInt("numWriters", 10) + } + if b.NumReaders == 0 { + b.NumReaders = bench.conf.GetInt("numReaders", 10) + } + if b.NumJobsPerTopic == 0 { + b.NumJobsPerTopic = bench.conf.GetInt("numJobsPerTopic", 10000) + } + if b.EventSize == 0 { + b.EventSize = bench.conf.GetInt("eventSize", 1536) + } + if b.EventsPickedUpPerQuery == 0 { + b.EventsPickedUpPerQuery = bench.conf.GetInt("eventsPickedUpPerQuery", 2000) + } + bench.log.Infon( + "base test config", + logger.NewIntField("numWriters", int64(b.NumWriters)), + logger.NewIntField("numReaders", int64(b.NumReaders)), + logger.NewIntField("numJobsPerTopic", int64(b.NumJobsPerTopic)), + logger.NewIntField("eventSize", int64(b.EventSize)), + logger.NewIntField("EventsPickedUpPerQuery", int64(b.EventsPickedUpPerQuery)), + logger.NewIntField("failure%", int64(b.FailurePercent)), + ) + + jd := jobsdb.NewForReadWrite( + prefix, + jobsdb.WithClearDB(b.ClearDB), + jobsdb.WithDBHandle(db), + jobsdb.WithSkipMaintenanceErr(config.GetBool("jobsDB.skipMaintenanceError", false)), + ) + defer jd.TearDown() + if err := jd.Start(); err != nil { + return fmt.Errorf("start jobsdb: %w", err) + } + totalJobs := int(float64(b.NumJobsPerTopic)/float64(b.NumWriters)) * b.NumReaders * b.NumWriters + readProgressBar := progressbar.Default(int64(totalJobs)) + g, ctx := kitsync.NewEagerGroup(ctx, 2) + writesDone := make(chan struct{}) + g.Go(func() error { + defer close(writesDone) + writeGroup, ctx := kitsync.NewEagerGroup(ctx, b.NumWriters) + for i := 0; i < b.NumWriters; i++ { + writeGroup.Go(func() error { + start := time.Now() + bench.log.Debugw("started writer", "num", i) + defer func() { + bench.log.Debugn("closed writer", + logger.NewIntField("num", int64(i)), + logger.NewDurationField("duration", time.Since(start)), + ) + }() + for j := 0; j < b.NumReaders; j++ { + jobs := createJobs( + "source_"+strconv.Itoa(j), + int(float64(b.NumJobsPerTopic)/float64(b.NumWriters)), + b.EventSize, + ) + if err := jd.Store(ctx, jobs); err != nil { + return fmt.Errorf("storing jobs: %w", err) + } + bench.log.Debugn("jobs stored", + logger.NewIntField("num", int64(len(jobs))), + logger.NewStringField("reader", "source_"+strconv.Itoa(j)), + ) + + } + return nil + }) + } + return writeGroup.Wait() + }) + g.Go(func() error { + readGroup, ctx := kitsync.NewEagerGroup(ctx, b.NumReaders) + for i := 0; i < b.NumReaders; i++ { + readGroup.Go(func() error { + start := time.Now() + bench.log.Debugw("started reader", "num", i) + defer func() { + bench.log.Debugn( + "closed reader", + logger.NewIntField("num", int64(i)), + logger.NewDurationField("readerDuration", time.Since(start)), + ) + }() + sourceID := "source_" + strconv.Itoa(i) + timer := time.NewTicker(10 * time.Millisecond) + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + } + jobResult, err := jd.GetToProcess( + ctx, + jobsdb.GetQueryParams{ + CustomValFilters: []string{"GW"}, + ParameterFilters: []jobsdb.ParameterFilterT{ + {Name: "source_id", Value: sourceID}, + }, + JobsLimit: b.EventsPickedUpPerQuery, + EventsLimit: b.EventsPickedUpPerQuery, + }, + nil, + ) + if err != nil { + return fmt.Errorf("getJobs - %s: %w", sourceID, err) + } + jobs := jobResult.Jobs + if len(jobs) == 0 { + select { + case <-writesDone: + return nil + default: + continue + } + } + // mark jobs as executing + if err := bench.markExecuting(ctx, jd, jobs); err != nil { + return fmt.Errorf("mark executing - %s: %w", sourceID, err) + } + + // perform some work + // sleep somewhere between 10-100 ms + randTime := rand.IntN(90) + 10 + time.Sleep(time.Duration(randTime) * time.Millisecond) + + toFail := int(float64(b.FailurePercent*len(jobs)) / 100) + statusList := make([]*jobsdb.JobStatusT, 0, len(jobs)) + fail := 0 + for j := 0; j < len(jobs); j++ { + jobState := jobsdb.Succeeded.State + if j < toFail { // simply fail first `toFail` jobs + // only if first attempt + if jobs[j].LastJobStatus.AttemptNum == 0 { + jobState = jobsdb.Failed.State + fail++ + } + } + statusList = append(statusList, &jobsdb.JobStatusT{ + JobID: jobs[j].JobID, + AttemptNum: jobs[j].LastJobStatus.AttemptNum + 1, + JobState: jobState, + ExecTime: time.Now(), + RetryTime: time.Now(), + ErrorCode: "", + ErrorResponse: []byte(`{}`), + Parameters: []byte(`{}`), + JobParameters: jobs[j].Parameters, + WorkspaceId: jobs[j].WorkspaceId, + }) + } + if err := jd.UpdateJobStatus(ctx, statusList, []string{"GW"}, []jobsdb.ParameterFilterT{ + {Name: "source_id", Value: sourceID}, + }); err != nil { + return fmt.Errorf("update status: %w", err) + } + readProgressBar.Add(len(jobs) - fail) + } + }) + } + return readGroup.Wait() + }) + + if err := g.Wait(); err != nil { + return err + } + bench.log.Infon( + "done", + logger.NewDurationField("duration", time.Since(start)), + ) + return nil +} + +type BaseTest struct { + ClearDB bool + NumWriters int + NumReaders int + NumJobsPerTopic int + EventSize int + FailurePercent int + EventsPickedUpPerQuery int +} + +func createJobs(sourceID string, numEvents, eventSize int) []*jobsdb.JobT { + jobs := make([]*jobsdb.JobT, 0, numEvents) + samplePayload := &bytes.Buffer{} + samplePayload.WriteString(`{"field1": "`) + samplePayload.WriteString(trand.String(eventSize / 2)) + samplePayload.WriteString(`", "field2": "`) + samplePayload.WriteString(trand.String(eventSize / 2)) + samplePayload.WriteString(`"}`) + payload := samplePayload.Bytes() + for i := 0; i < numEvents; i++ { + jobs = append(jobs, &jobsdb.JobT{ + UUID: uuid.New(), + UserID: trand.String(10), + CustomVal: "GW", + EventCount: 1, + EventPayload: payload, + Parameters: json.RawMessage(parameters(sourceID)), + WorkspaceId: sourceID, + }) + } + return jobs +} + +func parameters(sourceID string) string { + return fmt.Sprintf(`{"source_id": "%s"}`, sourceID) +} + +func (bencher *Bencher) markExecuting(ctx context.Context, jd *jobsdb.Handle, jobs []*jobsdb.JobT) error { + start := time.Now() + statusList := make([]*jobsdb.JobStatusT, len(jobs)) + for i, job := range jobs { + statusList[i] = &jobsdb.JobStatusT{ + JobID: job.JobID, + AttemptNum: job.LastJobStatus.AttemptNum, + JobState: jobsdb.Executing.State, + ExecTime: time.Now(), + RetryTime: time.Now(), + ErrorCode: "", + ErrorResponse: []byte(`{}`), + Parameters: []byte(`{}`), + JobParameters: job.Parameters, + WorkspaceId: job.WorkspaceId, + } + } + if err := jd.UpdateJobStatus(ctx, statusList, []string{"GW"}, nil); err != nil { + return err + } + bencher.log.Debugn( + "marked executed", + logger.NewIntField("num", int64(len(statusList))), + logger.NewDurationField("duration", time.Since(start)), + ) + return nil +} diff --git a/jobsdb/cmd/bench/main.go b/jobsdb/cmd/bench/main.go new file mode 100644 index 0000000000..1acfbd797f --- /dev/null +++ b/jobsdb/cmd/bench/main.go @@ -0,0 +1,127 @@ +package main + +import ( + "context" + "os" + "os/signal" + "sort" + "syscall" + + "github.com/urfave/cli/v2" + + "github.com/rudderlabs/rudder-go-kit/config" + "github.com/rudderlabs/rudder-go-kit/logger" +) + +func main() { + tool := cli.NewApp() + tool.Name = "jobsdbBench" + tool.Description = "A command line benchmark tool for jobsdb" + log := logger.NewLogger().Child("jobsdb") + conf := config.New(config.WithEnvPrefix("jobsdb")) + b := &Bencher{ + conf: conf, + log: log, + } + var ( + // base test + + numReaders int + numWriters int + clearDB bool + numJobsPerReader int + eventSize int + failurePercentage int + eventsPickedUpPerQuery int + verbose bool + + // more vars for other cases below + ) + + tool.Commands = []*cli.Command{ + { + Name: "base", + Usage: "run a benchmark with m readers and n writers concurrently", + Description: "creates jobs of m sourceIDs for the writers to query. Readers query by sourceID", + Flags: []cli.Flag{ + &cli.IntFlag{ + Name: "numReaders", + Usage: "number of concurrent readers - test will create as many sourceIDs", + Aliases: []string{"m"}, + Destination: &numReaders, + DefaultText: `config.GetInt("numReaders", 10)`, + }, + &cli.IntFlag{ + Name: "numWriters", + Usage: "number of concurrent writers", + Aliases: []string{"n"}, + Destination: &numWriters, + DefaultText: `config.GetInt("numWriters", 10)`, + }, + &cli.IntFlag{ + Name: "numJobsPerReader", + Usage: "number of jobs per sourceID/reader", + Aliases: []string{"j"}, + Destination: &numJobsPerReader, + DefaultText: `config.GetInt("numJobsPerReader", 10000)`, + }, + &cli.IntFlag{ + Name: "eventSize", + Usage: "payload size of the events in bytes(estimate)", + Aliases: []string{"e"}, + Destination: &eventSize, + DefaultText: `config.GetInt("eventSize", 1536)`, + }, + &cli.IntFlag{ + Name: "failurePercentage", + Usage: "percentage of events that fail on the first attempt, all pass on second attempt", + Aliases: []string{"f"}, + Destination: &failurePercentage, + DefaultText: `0`, + }, + &cli.IntFlag{ + Name: "eventsPickedUpPerQuery", + Usage: "number of events picked up per query", + Aliases: []string{"q"}, + Destination: &eventsPickedUpPerQuery, + DefaultText: `config.GetInt("eventsPickedUpPerQuery", 2000)`, + }, + &cli.BoolFlag{ + Name: "clearDB", + Usage: "cleardb before starting", + Aliases: []string{"c"}, + Destination: &clearDB, + DefaultText: `false`, + }, + &cli.BoolFlag{ + Name: "verbose", + Usage: "verbose benchmark script logging", + Aliases: []string{"v"}, + Destination: &verbose, + DefaultText: `false`, + }, + }, + Action: func(ctx *cli.Context) error { + if verbose { + logger.SetLogLevel("jobsdb", "DEBUG") + } + return b.RunBaseTest(ctx.Context, BaseTest{ + ClearDB: clearDB, + FailurePercent: failurePercentage, + NumWriters: numWriters, + NumReaders: numReaders, + NumJobsPerTopic: numJobsPerReader, + EventSize: eventSize, + EventsPickedUpPerQuery: eventsPickedUpPerQuery, + }) + }, + }, + } + + sort.Sort(cli.CommandsByName(tool.Commands)) + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer cancel() + if err := tool.RunContext(ctx, os.Args); err != nil { + log.Fatal(err) + } +}