diff --git a/.circleci/config.yml b/.circleci/config.yml new file mode 100644 index 000000000..116e98632 --- /dev/null +++ b/.circleci/config.yml @@ -0,0 +1,44 @@ +# Golang CircleCI 2.0 configuration file +# +# Check https://circleci.com/docs/2.0/language-go/ for more details +version: 2 +jobs: + build: # test with redisearch:latest + docker: + - image: circleci/golang:1.13 + + working_directory: /go/src/github.com/RedisTimeSeries/tsbs + steps: + - checkout + - run: | + make redistimeseries + make test-rts + +# +# build-multiarch-docker: +# machine: +# enabled: true +# steps: +# - checkout +# - run: | +# echo "$DOCKER_REDISBENCH_PWD" | base64 --decode | docker login --username $DOCKER_REDISBENCH_USER --password-stdin +# - run: +# name: Build +# command: | +# make docker-release +# no_output_timeout: 20m + +workflows: + version: 2 + build_and_package: + jobs: + - build: + filters: + tags: + only: /.*/ +# - build-multiarch-docker: +# filters: +# tags: +# only: /.*/ +# branches: +# only: master diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 000000000..290031ac2 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,24 @@ +# ignore .git and .cache folders +.git +.editorconfig +.gitignore +.dockerignore +.travis.yml +coverage.txt +AUTHORS.md +CONTRIBUTING.md +LICENSE +NOTICE +README.md + +*.out +*.log +.DS_Store +.idea +.vscode +**/bin +**/docs +**/scripts + +# High Dynamic Range (HDR) Histogram files +*.hdr diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml new file mode 100644 index 000000000..d09213273 --- /dev/null +++ b/.github/workflows/go.yml @@ -0,0 +1,21 @@ +name: Go + +on: + push: + branches: [ master ] + pull_request: + branches: [ master ] + +jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + + - name: Set up Go + uses: actions/setup-go@v2 + with: + go-version: 1.15 + + - name: Test + run: go test -v -race ./... diff --git a/.gitignore b/.gitignore index 04dd8bed8..6e8eff6ef 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,17 @@ .DS_Store .idea .vscode +*~ + +bin # High Dynamic Range (HDR) Histogram files -*.hdr \ No newline at end of file +*.hdr + +/docs/responses +coverage.txt + +bin/ + +results/ +dist diff --git a/Makefile b/Makefile index df324a7d3..b060848a6 100644 --- a/Makefile +++ b/Makefile @@ -7,6 +7,7 @@ GOTEST=$(GOCMD) test GOGET=$(GOCMD) get GOMOD=$(GOCMD) mod GOFMT=$(GOCMD) fmt +DISTDIR= ./dist .PHONY: all generators loaders runners lint fmt checkfmt @@ -25,7 +26,8 @@ loaders: tsbs_load \ tsbs_load_prometheus \ tsbs_load_siridb \ tsbs_load_timescaledb \ - tsbs_load_victoriametrics + tsbs_load_victoriametrics \ + tsbs_load_questdb runners: tsbs_run_queries_akumuli \ tsbs_run_queries_cassandra \ @@ -36,7 +38,8 @@ runners: tsbs_run_queries_akumuli \ tsbs_run_queries_siridb \ tsbs_run_queries_timescaledb \ tsbs_run_queries_timestream \ - tsbs_run_queries_victoriametrics + tsbs_run_queries_victoriametrics \ + tsbs_run_queries_questdb test: $(GOTEST) -v ./... @@ -64,3 +67,33 @@ lint: fmt: $(GOFMT) ./... + +release-redistimeseries: + $(GOGET) github.com/mitchellh/gox + $(GOGET) github.com/tcnksm/ghr + GO111MODULE=on gox -osarch "linux/amd64 darwin/amd64" -output "${DISTDIR}/tsbs_run_queries_redistimeseries_{{.OS}}_{{.Arch}}" ./cmd/tsbs_run_queries_redistimeseries + GO111MODULE=on gox -osarch "linux/amd64 darwin/amd64" -output "${DISTDIR}/tsbs_load_redistimeseries_{{.OS}}_{{.Arch}}" ./cmd/tsbs_load_redistimeseries + + +redistimeseries: tsbs_generate_data tsbs_generate_queries tsbs_load_redistimeseries tsbs_run_queries_redistimeseries + +publish-redistimeseries: release-redistimeseries + @for f in $(shell ls ${DISTDIR}); \ + do \ + echo "copying ${DISTDIR}/$${f}"; \ + aws s3 cp ${DISTDIR}/$${f} s3://benchmarks.redislabs/redistimeseries/tools/tsbs/$${f} --acl public-read; \ + done + +publish-redistimeseries-queries: + @for f in $(shell ls /tmp/bulk_queries); \ + do \ + echo "copying $${f}"; \ + aws s3 cp /tmp/bulk_queries/$${f} s3://benchmarks.redislabs/redistimeseries/tsbs/queries/devops/scale100/devops-scale100-4days/$${f} --acl public-read; \ + done + +publish-redistimeseries-data: + @for f in $(shell ls /tmp/bulk_data_redistimeseries); \ + do \ + echo "copying $${f}"; \ + aws s3 cp /tmp/bulk_data_redistimeseries/$${f} s3://benchmarks.redislabs/redistimeseries/tsbs/devops/bulk_data_redistimeseries/$${f} --acl public-read; \ + done \ No newline at end of file diff --git a/README.md b/README.md index 27f0be5f2..0ac8b93e4 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,8 @@ Current databases supported: + CrateDB [(supplemental docs)](docs/cratedb.md) + InfluxDB [(supplemental docs)](docs/influx.md) + MongoDB [(supplemental docs)](docs/mongo.md) ++ RedisTimeSeries [(supplemental docs)](docs/redistimeseries.md) ++ QuestDB [(supplemental docs)](docs/questdb.md) + SiriDB [(supplemental docs)](docs/siridb.md) + TimescaleDB [(supplemental docs)](docs/timescaledb.md) + Timestream [(supplemental docs)](docs/timestream.md) @@ -75,6 +77,8 @@ cases are implemented for each database: |CrateDB|X|| |InfluxDB|X|X| |MongoDB|X| +|RedisTimeSeries|X| +|QuestDB|X|X |SiriDB|X| |TimescaleDB|X|X| |Timestream|X|| @@ -132,7 +136,7 @@ Variables needed: 1. an end time. E.g., `2016-01-04T00:00:00Z` 1. how much time should be between each reading per device, in seconds. E.g., `10s` 1. and which database(s) you want to generate for. E.g., `timescaledb` - (choose from `cassandra`, `clickhouse`, `cratedb`, `influx`, `mongo`, `siridb`, + (choose from `cassandra`, `clickhouse`, `cratedb`, `influx`, `mongo`, `questdb`, `siridb`, `timescaledb` or `victoriametrics`) Given the above steps you can now generate a dataset (or multiple diff --git a/cmd/tsbs_generate_queries/databases/questdb/common.go b/cmd/tsbs_generate_queries/databases/questdb/common.go new file mode 100644 index 000000000..407555dba --- /dev/null +++ b/cmd/tsbs_generate_queries/databases/questdb/common.go @@ -0,0 +1,50 @@ +package questdb + +import ( + "fmt" + "net/url" + "time" + + "github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/devops" + "github.com/timescale/tsbs/cmd/tsbs_generate_queries/utils" + "github.com/timescale/tsbs/pkg/query" +) + +// BaseGenerator contains settings specific for QuestDB +type BaseGenerator struct { +} + +// GenerateEmptyQuery returns an empty query.QuestDB. +func (g *BaseGenerator) GenerateEmptyQuery() query.Query { + return query.NewHTTP() +} + +// fillInQuery fills the query struct with data. +func (g *BaseGenerator) fillInQuery(qi query.Query, humanLabel, humanDesc, sql string) { + v := url.Values{} + v.Set("count", "false") + v.Set("query", sql) + q := qi.(*query.HTTP) + q.HumanLabel = []byte(humanLabel) + q.RawQuery = []byte(sql) + q.HumanDescription = []byte(humanDesc) + q.Method = []byte("GET") + q.Path = []byte(fmt.Sprintf("/exec?%s", v.Encode())) + q.Body = nil +} + +// NewDevops creates a new devops use case query generator. +func (g *BaseGenerator) NewDevops(start, end time.Time, scale int) (utils.QueryGenerator, error) { + core, err := devops.NewCore(start, end, scale) + + if err != nil { + return nil, err + } + + devops := &Devops{ + BaseGenerator: g, + Core: core, + } + + return devops, nil +} diff --git a/cmd/tsbs_generate_queries/databases/questdb/devops.go b/cmd/tsbs_generate_queries/databases/questdb/devops.go new file mode 100644 index 000000000..ca1b3f9cc --- /dev/null +++ b/cmd/tsbs_generate_queries/databases/questdb/devops.go @@ -0,0 +1,210 @@ +package questdb + +import ( + "fmt" + "strings" + "time" + + "github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/devops" + "github.com/timescale/tsbs/pkg/query" +) + +// TODO: Remove the need for this by continuing to bubble up errors +func panicIfErr(err error) { + if err != nil { + panic(err.Error()) + } +} + +// Devops produces QuestDB-specific queries for all the devops query types. +type Devops struct { + *BaseGenerator + *devops.Core +} + +// getSelectAggClauses builds specified aggregate function clauses for +// a set of column idents. +// +// For instance: +// max(cpu_time) AS max_cpu_time +func (d *Devops) getSelectAggClauses(aggFunc string, idents []string) []string { + selectAggClauses := make([]string, len(idents)) + for i, ident := range idents { + selectAggClauses[i] = + fmt.Sprintf("%[1]s(%[2]s) AS %[1]s_%[2]s", aggFunc, ident) + } + return selectAggClauses +} + +// MaxAllCPU selects the MAX of all metrics under 'cpu' per hour for N random +// hosts +// +// Queries: +// cpu-max-all-1 +// cpu-max-all-8 +func (d *Devops) MaxAllCPU(qi query.Query, nHosts int) { + interval := d.Interval.MustRandWindow(devops.MaxAllDuration) + selectClauses := d.getSelectAggClauses("max", devops.GetAllCPUMetrics()) + hosts, err := d.GetRandomHosts(nHosts) + panicIfErr(err) + + sql := fmt.Sprintf(` + SELECT + hour(timestamp) AS hour, + %s + FROM cpu + WHERE hostname IN ('%s') + AND timestamp >= '%s' + AND timestamp < '%s' + SAMPLE BY 1h`, + strings.Join(selectClauses, ", "), + strings.Join(hosts, "', '"), + interval.StartString(), + interval.EndString()) + + humanLabel := devops.GetMaxAllLabel("QuestDB", nHosts) + humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString()) + d.fillInQuery(qi, humanLabel, humanDesc, sql) +} + +// GroupByTimeAndPrimaryTag selects the AVG of metrics in the group `cpu` per device +// per hour for a day +// +// Queries: +// double-groupby-1 +// double-groupby-5 +// double-groupby-all +func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) { + metrics, err := devops.GetCPUMetricsSlice(numMetrics) + panicIfErr(err) + interval := d.Interval.MustRandWindow(devops.DoubleGroupByDuration) + selectClauses := d.getSelectAggClauses("avg", metrics) + + sql := fmt.Sprintf(` + SELECT timestamp, hostname, + %s + FROM cpu + WHERE timestamp >= '%s' + AND timestamp < '%s' + SAMPLE BY 1h + GROUP BY timestamp, hostname`, + strings.Join(selectClauses, ", "), + interval.StartString(), + interval.EndString()) + + humanLabel := devops.GetDoubleGroupByLabel("QuestDB", numMetrics) + humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString()) + d.fillInQuery(qi, humanLabel, humanDesc, sql) +} + +// GroupByOrderByLimit populates a query.Query that has a time WHERE clause, +// that groups by a truncated date, orders by that date, and takes a limit: +// +// Queries: +// groupby-orderby-limit +func (d *Devops) GroupByOrderByLimit(qi query.Query) { + interval := d.Interval.MustRandWindow(time.Hour) + sql := fmt.Sprintf(` + SELECT timestamp AS minute, + max(usage_user) + FROM cpu + WHERE timestamp < '%s' + SAMPLE BY 1m + LIMIT 5`, + interval.EndString()) + + humanLabel := "QuestDB max cpu over last 5 min-intervals (random end)" + humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.EndString()) + d.fillInQuery(qi, humanLabel, humanDesc, sql) +} + +// LastPointPerHost finds the last row for every host in the dataset +// +// Queries: +// lastpoint +func (d *Devops) LastPointPerHost(qi query.Query) { + sql := fmt.Sprintf(`SELECT * FROM cpu latest by hostname`) + + humanLabel := "QuestDB last row per host" + humanDesc := humanLabel + d.fillInQuery(qi, humanLabel, humanDesc, sql) +} + +// HighCPUForHosts populates a query that gets CPU metrics when the CPU has +// high usage between a time period for a number of hosts (if 0, it will +// search all hosts) +// +// Queries: +// high-cpu-1 +// high-cpu-all +func (d *Devops) HighCPUForHosts(qi query.Query, nHosts int) { + interval := d.Interval.MustRandWindow(devops.HighCPUDuration) + sql := "" + if nHosts > 0 { + hosts, err := d.GetRandomHosts(nHosts) + panicIfErr(err) + + sql = fmt.Sprintf(` + SELECT * + FROM cpu + WHERE usage_user > 90.0 + AND hostname IN ('%s') + AND timestamp >= '%s' + AND timestamp < '%s'`, + strings.Join(hosts, "', '"), + interval.StartString(), + interval.EndString()) + } else { + sql = fmt.Sprintf(` + SELECT * + FROM cpu + WHERE usage_user > 90.0 + AND timestamp >= '%s' + AND timestamp < '%s'`, + interval.StartString(), + interval.EndString()) + } + + humanLabel, err := devops.GetHighCPULabel("QuestDB", nHosts) + panicIfErr(err) + humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString()) + d.fillInQuery(qi, humanLabel, humanDesc, sql) +} + +// GroupByTime selects the MAX for metrics under 'cpu', per minute for N random +// hosts +// +// Resultsets: +// single-groupby-1-1-12 +// single-groupby-1-1-1 +// single-groupby-1-8-1 +// single-groupby-5-1-12 +// single-groupby-5-1-1 +// single-groupby-5-8-1 +func (d *Devops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange time.Duration) { + interval := d.Interval.MustRandWindow(timeRange) + metrics, err := devops.GetCPUMetricsSlice(numMetrics) + panicIfErr(err) + selectClauses := d.getSelectAggClauses("max", metrics) + hosts, err := d.GetRandomHosts(nHosts) + panicIfErr(err) + + sql := fmt.Sprintf(` + SELECT timestamp, + %s + FROM cpu + WHERE hostname IN ('%s') + AND timestamp >= '%s' + AND timestamp < '%s' + SAMPLE BY 1m`, + strings.Join(selectClauses, ", "), + strings.Join(hosts, "', '"), + interval.StartString(), + interval.EndString()) + + humanLabel := fmt.Sprintf( + "QuestDB %d cpu metric(s), random %4d hosts, random %s by 1m", + numMetrics, nHosts, timeRange) + humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString()) + d.fillInQuery(qi, humanLabel, humanDesc, sql) +} diff --git a/cmd/tsbs_generate_queries/databases/questdb/devops_test.go b/cmd/tsbs_generate_queries/databases/questdb/devops_test.go new file mode 100644 index 000000000..ae1f3792f --- /dev/null +++ b/cmd/tsbs_generate_queries/databases/questdb/devops_test.go @@ -0,0 +1,298 @@ +package questdb + +import ( + "math/rand" + "net/url" + "regexp" + "testing" + "time" + + "github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/devops" + "github.com/timescale/tsbs/pkg/query" +) + +func TestDevopsGroupByTime(t *testing.T) { + expectedHumanLabel := "QuestDB 1 cpu metric(s), random 1 hosts, random 1s by 1m" + expectedHumanDesc := "QuestDB 1 cpu metric(s), random 1 hosts, random 1s by 1m: 1970-01-01T00:05:58Z" + expectedQuery := "SELECT timestamp, max(usage_user) AS max_usage_user FROM cpu " + + "WHERE hostname IN ('host_9') AND timestamp >= '1970-01-01T00:05:58Z' AND timestamp < '1970-01-01T00:05:59Z' SAMPLE BY 1m" + + rand.Seed(123) // Setting seed for testing purposes. + s := time.Unix(0, 0) + e := s.Add(time.Hour) + b := BaseGenerator{} + dq, err := b.NewDevops(s, e, 10) + if err != nil { + t.Fatalf("Error while creating devops generator") + } + d := dq.(*Devops) + + metrics := 1 + nHosts := 1 + duration := time.Second + + q := d.GenerateEmptyQuery() + d.GroupByTime(q, nHosts, metrics, duration) + + verifyQuery(t, q, expectedHumanLabel, expectedHumanDesc, expectedQuery) +} + +func TestDevopsGroupByOrderByLimit(t *testing.T) { + expectedHumanLabel := "QuestDB max cpu over last 5 min-intervals (random end)" + expectedHumanDesc := "QuestDB max cpu over last 5 min-intervals (random end): 1970-01-01T01:16:22Z" + expectedQuery := "SELECT timestamp AS minute, max(usage_user) FROM cpu " + + "WHERE timestamp < '1970-01-01T01:16:22Z' SAMPLE BY 1m LIMIT 5" + + rand.Seed(123) // Setting seed for testing purposes. + s := time.Unix(0, 0) + e := s.Add(2 * time.Hour) + b := BaseGenerator{} + dq, err := b.NewDevops(s, e, 10) + if err != nil { + t.Fatalf("Error while creating devops generator") + } + d := dq.(*Devops) + + q := d.GenerateEmptyQuery() + d.GroupByOrderByLimit(q) + + verifyQuery(t, q, expectedHumanLabel, expectedHumanDesc, expectedQuery) +} + +func TestDevopsGroupByTimeAndPrimaryTag(t *testing.T) { + cases := []testCase{ + { + desc: "zero metrics", + input: 0, + fail: true, + failMsg: "cannot get 0 metrics", + }, + { + desc: "1 metric", + input: 1, + expectedHumanLabel: "QuestDB mean of 1 metrics, all hosts, random 12h0m0s by 1h", + expectedHumanDesc: "QuestDB mean of 1 metrics, all hosts, random 12h0m0s by 1h: 1970-01-01T00:16:22Z", + expectedQuery: "SELECT timestamp, hostname, avg(usage_user) AS avg_usage_user FROM cpu " + + "WHERE timestamp >= '1970-01-01T00:16:22Z' AND timestamp < '1970-01-01T12:16:22Z' " + + "SAMPLE BY 1h GROUP BY timestamp, hostname", + }, + { + desc: "5 metrics", + input: 5, + expectedHumanLabel: "QuestDB mean of 5 metrics, all hosts, random 12h0m0s by 1h", + expectedHumanDesc: "QuestDB mean of 5 metrics, all hosts, random 12h0m0s by 1h: 1970-01-01T00:54:10Z", + expectedQuery: "SELECT timestamp, hostname, avg(usage_user) AS avg_usage_user, avg(usage_system) AS avg_usage_system, avg(usage_idle) AS avg_usage_idle, avg(usage_nice) AS avg_usage_nice, avg(usage_iowait) AS avg_usage_iowait FROM cpu " + + "WHERE timestamp >= '1970-01-01T00:54:10Z' AND timestamp < '1970-01-01T12:54:10Z' " + + "SAMPLE BY 1h GROUP BY timestamp, hostname", + }, + } + + testFunc := func(d *Devops, c testCase) query.Query { + q := d.GenerateEmptyQuery() + d.GroupByTimeAndPrimaryTag(q, c.input) + return q + } + + start := time.Unix(0, 0) + end := start.Add(devops.DoubleGroupByDuration).Add(time.Hour) + + runTestCases(t, testFunc, start, end, cases) +} + +func TestMaxAllCPU(t *testing.T) { + cases := []testCase{ + { + desc: "zero hosts", + input: 0, + fail: true, + failMsg: "number of hosts cannot be < 1; got 0", + }, + { + desc: "1 host", + input: 1, + expectedHumanLabel: "QuestDB max of all CPU metrics, random 1 hosts, random 8h0m0s by 1h", + expectedHumanDesc: "QuestDB max of all CPU metrics, random 1 hosts, random 8h0m0s by 1h: 1970-01-01T00:54:10Z", + expectedQuery: "SELECT hour(timestamp) AS hour, max(usage_user) AS max_usage_user, max(usage_system) AS max_usage_system, max(usage_idle) AS max_usage_idle, max(usage_nice) AS max_usage_nice, max(usage_iowait) AS max_usage_iowait, max(usage_irq) AS max_usage_irq, max(usage_softirq) AS max_usage_softirq, max(usage_steal) AS max_usage_steal, max(usage_guest) AS max_usage_guest, max(usage_guest_nice) AS max_usage_guest_nice FROM cpu " + + "WHERE hostname IN ('host_3') AND timestamp >= '1970-01-01T00:54:10Z' AND timestamp < '1970-01-01T08:54:10Z' " + + "SAMPLE BY 1h", + }, + { + desc: "5 hosts", + input: 5, + expectedHumanLabel: "QuestDB max of all CPU metrics, random 5 hosts, random 8h0m0s by 1h", + expectedHumanDesc: "QuestDB max of all CPU metrics, random 5 hosts, random 8h0m0s by 1h: 1970-01-01T00:37:12Z", + expectedQuery: "SELECT hour(timestamp) AS hour, max(usage_user) AS max_usage_user, max(usage_system) AS max_usage_system, max(usage_idle) AS max_usage_idle, max(usage_nice) AS max_usage_nice, max(usage_iowait) AS max_usage_iowait, max(usage_irq) AS max_usage_irq, max(usage_softirq) AS max_usage_softirq, max(usage_steal) AS max_usage_steal, max(usage_guest) AS max_usage_guest, max(usage_guest_nice) AS max_usage_guest_nice FROM cpu " + + "WHERE hostname IN ('host_9', 'host_5', 'host_1', 'host_7', 'host_2') AND timestamp >= '1970-01-01T00:37:12Z' AND timestamp < '1970-01-01T08:37:12Z' " + + "SAMPLE BY 1h", + }, + } + + testFunc := func(d *Devops, c testCase) query.Query { + q := d.GenerateEmptyQuery() + d.MaxAllCPU(q, c.input) + return q + } + + start := time.Unix(0, 0) + end := start.Add(devops.MaxAllDuration).Add(time.Hour) + + runTestCases(t, testFunc, start, end, cases) +} + +func TestLastPointPerHost(t *testing.T) { + expectedHumanLabel := "QuestDB last row per host" + expectedHumanDesc := "QuestDB last row per host" + expectedQuery := `SELECT * FROM cpu latest by hostname` + + rand.Seed(123) // Setting seed for testing purposes. + s := time.Unix(0, 0) + e := s.Add(2 * time.Hour) + b := BaseGenerator{} + dq, err := b.NewDevops(s, e, 10) + if err != nil { + t.Fatalf("Error while creating devops generator") + } + d := dq.(*Devops) + + q := d.GenerateEmptyQuery() + d.LastPointPerHost(q) + + verifyQuery(t, q, expectedHumanLabel, expectedHumanDesc, expectedQuery) +} + +func TestHighCPUForHosts(t *testing.T) { + cases := []testCase{ + { + desc: "negative hosts", + input: -1, + fail: true, + failMsg: "nHosts cannot be negative", + }, + { + desc: "zero hosts", + input: 0, + expectedHumanLabel: "QuestDB CPU over threshold, all hosts", + expectedHumanDesc: "QuestDB CPU over threshold, all hosts: 1970-01-01T00:54:10Z", + expectedQuery: "SELECT * FROM cpu " + + "WHERE usage_user > 90.0 AND " + + "timestamp >= '1970-01-01T00:54:10Z' AND timestamp < '1970-01-01T12:54:10Z'", + }, + { + desc: "1 host", + input: 1, + expectedHumanLabel: "QuestDB CPU over threshold, 1 host(s)", + expectedHumanDesc: "QuestDB CPU over threshold, 1 host(s): 1970-01-01T00:47:30Z", + expectedQuery: "SELECT * FROM cpu " + + "WHERE usage_user > 90.0 AND hostname IN ('host_5') AND " + + "timestamp >= '1970-01-01T00:47:30Z' AND timestamp < '1970-01-01T12:47:30Z'", + }, + { + desc: "5 hosts", + input: 5, + expectedHumanLabel: "QuestDB CPU over threshold, 5 host(s)", + expectedHumanDesc: "QuestDB CPU over threshold, 5 host(s): 1970-01-01T00:17:45Z", + expectedQuery: "SELECT * FROM cpu " + + "WHERE usage_user > 90.0 AND " + + "hostname IN ('host_9', 'host_5', 'host_1', 'host_7', 'host_2') AND " + + "timestamp >= '1970-01-01T00:17:45Z' AND timestamp < '1970-01-01T12:17:45Z'", + }, + } + + testFunc := func(d *Devops, c testCase) query.Query { + q := d.GenerateEmptyQuery() + d.HighCPUForHosts(q, c.input) + return q + } + + start := time.Unix(0, 0) + end := start.Add(devops.HighCPUDuration).Add(time.Hour) + + runTestCases(t, testFunc, start, end, cases) +} + +type testCase struct { + desc string + input int + fail bool + failMsg string + expectedHumanLabel string + expectedHumanDesc string + expectedQuery string +} + +func runTestCases(t *testing.T, testFunc func(*Devops, testCase) query.Query, s time.Time, e time.Time, cases []testCase) { + rand.Seed(123) // Setting seed for testing purposes. + + for _, c := range cases { + t.Run(c.desc, func(t *testing.T) { + b := BaseGenerator{} + dq, err := b.NewDevops(s, e, 10) + if err != nil { + t.Fatalf("Error while creating devops generator") + } + d := dq.(*Devops) + + if c.fail { + func() { + defer func() { + r := recover() + if r == nil { + t.Errorf("did not panic when should") + } + + if r != c.failMsg { + t.Fatalf("incorrect fail message: got %s, want %s", r, c.failMsg) + } + }() + + testFunc(d, c) + }() + } else { + q := testFunc(d, c) + verifyQuery(t, q, c.expectedHumanLabel, c.expectedHumanDesc, c.expectedQuery) + } + + }) + } +} + +func verifyQuery(t *testing.T, q query.Query, humanLabel, humanDesc, expectedSql string) { + sql, ok := q.(*query.HTTP) + + if !ok { + t.Fatal("Filled query is not *query.HTTP type") + } + + if got := string(sql.HumanLabel); got != humanLabel { + t.Errorf("incorrect human label:\ngot\n%s\nwant\n%s", got, humanLabel) + } + + if got := string(sql.HumanDescription); got != humanDesc { + t.Errorf("incorrect human description:\ngot\n%s\nwant\n%s", got, humanDesc) + } + + if got := string(sql.Method); got != "GET" { + t.Errorf("incorrect method:\ngot\n%s\nwant GET", got) + } + + uri := string(sql.Path) + u, err := url.Parse(uri) + if err != nil { + t.Errorf("Failed to decode %s: %s", uri, err) + } + actualSql := normaliseField(u.Query()["query"][0]) + + if expectedSql != actualSql { + t.Errorf("expcted %s, actual %s", expectedSql, actualSql) + } +} + +func normaliseField(fieldValue string) string { + m1 := regexp.MustCompile("^\\s+") + m2 := regexp.MustCompile("\\s+$") + m3 := regexp.MustCompile("\\s+") + fieldValue = m1.ReplaceAllString(fieldValue, "") + fieldValue = m2.ReplaceAllString(fieldValue, "") + fieldValue = m3.ReplaceAllString(fieldValue, " ") + return fieldValue +} diff --git a/cmd/tsbs_generate_queries/databases/redistimeseries/common.go b/cmd/tsbs_generate_queries/databases/redistimeseries/common.go new file mode 100644 index 000000000..222755856 --- /dev/null +++ b/cmd/tsbs_generate_queries/databases/redistimeseries/common.go @@ -0,0 +1,54 @@ +package redistimeseries + +import ( + "time" + + "github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/devops" + "github.com/timescale/tsbs/cmd/tsbs_generate_queries/utils" + "github.com/timescale/tsbs/pkg/query" +) + +// BaseGenerator contains settings specific for RedisTimeSeries database. +type BaseGenerator struct { +} + +// GenerateEmptyQuery returns an empty query.Cassandra. +func (g *BaseGenerator) GenerateEmptyQuery() query.Query { + return query.NewRedisTimeSeries() +} + +// fill Query fills the query struct with data +func (d *BaseGenerator) fillInQueryStrings(qi query.Query, humanLabel, humanDesc string) { + q := qi.(*query.RedisTimeSeries) + q.HumanLabel = []byte(humanLabel) + q.HumanDescription = []byte(humanDesc) +} + +// AddQuery adds a command to be executed in the full flow of this Query +func (d *BaseGenerator) AddQuery(qi query.Query, tq [][]byte, commandname []byte) { + q := qi.(*query.RedisTimeSeries) + q.AddQuery(tq, commandname) +} + +// SetApplyFunctor sets SetApplyFunctor used for this Query +func (d *BaseGenerator) SetApplyFunctor(qi query.Query, value bool, functor string) { + q := qi.(*query.RedisTimeSeries) + q.SetApplyFunctor(value) + q.SetFunctor(functor) +} + +// NewDevops creates a new devops use case query generator. +func (g *BaseGenerator) NewDevops(start, end time.Time, scale int) (utils.QueryGenerator, error) { + core, err := devops.NewCore(start, end, scale) + + if err != nil { + return nil, err + } + + var devops utils.QueryGenerator = &Devops{ + BaseGenerator: g, + Core: core, + } + + return devops, nil +} diff --git a/cmd/tsbs_generate_queries/databases/redistimeseries/devops.go b/cmd/tsbs_generate_queries/databases/redistimeseries/devops.go new file mode 100644 index 000000000..fb9b71c3d --- /dev/null +++ b/cmd/tsbs_generate_queries/databases/redistimeseries/devops.go @@ -0,0 +1,281 @@ +package redistimeseries + +import ( + "fmt" + "time" + + "github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/devops" + "github.com/timescale/tsbs/pkg/query" +) + +// TODO: Remove the need for this by continuing to bubble up errors +func panicIfErr(err error) { + if err != nil { + panic(err.Error()) + } +} + +const ( + oneMinuteMillis = 60 * 1000 + oneHourMillis = oneMinuteMillis * 60 +) + +// Devops produces RedisTimeSeries-specific queries for all the devops query types. +type Devops struct { + *BaseGenerator + *devops.Core +} + +// GenerateEmptyQuery returns an empty query.RedisTimeSeries +func (d *Devops) GenerateEmptyQuery() query.Query { + return query.NewRedisTimeSeries() +} + +// GroupByTime fetches the MAX for numMetrics metrics under 'cpu', per minute for nhosts hosts, +// every 5 mins for 1 hour +func (d *Devops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange time.Duration) { + interval := d.Interval.MustRandWindow(timeRange) + redisQuery := [][]byte{ + //[]byte("TS.MRANGE"), Just to help understanding + []byte(fmt.Sprintf("%d", interval.StartUnixMillis())), + []byte(fmt.Sprintf("%d", interval.EndUnixMillis())), + []byte("AGGREGATION"), + []byte("MAX"), + []byte(fmt.Sprintf("%d", oneMinuteMillis)), + []byte("FILTER"), + []byte("measurement=cpu"), + } + + metrics, err := devops.GetCPUMetricsSlice(numMetrics) + panicIfErr(err) + + // we only need to filter if we we dont want all of them + if numMetrics != devops.GetCPUMetricsLen() { + redisArg := "fieldname=" + if numMetrics > 1 { + redisArg += "(" + } + for idx, value := range metrics { + redisArg += value + if idx != (numMetrics - 1) { + redisArg += "," + } + } + if numMetrics > 1 { + redisArg += ")" + } + redisQuery = append(redisQuery, []byte(redisArg)) + } + + hostnames, err := d.GetRandomHosts(nHosts) + panicIfErr(err) + + // add specific fieldname if needed. + redisArg := "hostname=" + if nHosts > 1 { + redisArg += "(" + } + for idx, value := range hostnames { + redisArg += value + if idx != (nHosts - 1) { + redisArg += "," + } + } + if nHosts > 1 { + redisArg += ")" + } + redisQuery = append(redisQuery, []byte(redisArg)) + + if nHosts > 1 && numMetrics == 1 { + redisQuery = append(redisQuery, []byte("GROUPBY"), []byte("hostname"), []byte("REDUCE"), []byte("max")) + } + if numMetrics > 1 { + redisQuery = append(redisQuery, []byte("GROUPBY"), []byte("fieldname"), []byte("REDUCE"), []byte("max")) + } + + humanLabel := devops.GetSingleGroupByLabel("RedisTimeSeries", numMetrics, nHosts, string(timeRange)) + humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString()) + d.fillInQueryStrings(qi, humanLabel, humanDesc) + d.AddQuery(qi, redisQuery, []byte("TS.MRANGE")) + +} + +// GroupByTimeAndPrimaryTag selects the AVG of numMetrics metrics under 'cpu' per device per hour for a day +func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) { + interval := d.Interval.MustRandWindow(devops.DoubleGroupByDuration) + redisQuery := [][]byte{ + //[]byte("TS.MRANGE"), Just to help understanding + []byte(fmt.Sprintf("%d", interval.StartUnixMillis())), + []byte(fmt.Sprintf("%d", interval.EndUnixMillis())), + []byte("AGGREGATION"), + []byte("AVG"), + []byte(fmt.Sprintf("%d", oneHourMillis)), + []byte("FILTER"), + []byte("measurement=cpu"), + } + + metrics, err := devops.GetCPUMetricsSlice(numMetrics) + panicIfErr(err) + + // add specific fieldname if needed. + if numMetrics != devops.GetCPUMetricsLen() { + redisArg := "fieldname=" + if numMetrics > 1 { + redisArg += "(" + } + for idx, value := range metrics { + redisArg += value + if idx != (numMetrics - 1) { + redisArg += "," + } + } + if numMetrics > 1 { + redisArg += ")" + } + redisQuery = append(redisQuery, []byte(redisArg)) + } + if numMetrics > 1 { + redisQuery = append(redisQuery, []byte("GROUPBY"), []byte("hostname"), []byte("REDUCE"), []byte("max")) + } + + humanLabel := devops.GetDoubleGroupByLabel("RedisTimeSeries", numMetrics) + humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString()) + d.fillInQueryStrings(qi, humanLabel, humanDesc) + d.AddQuery(qi, redisQuery, []byte("TS.MRANGE")) + functorName := query.GetFunctionName(query.GroupByTimeAndTagHostname) + d.SetApplyFunctor(qi, true, functorName) +} + +// MaxAllCPU fetches the aggregate across all CPU metrics per hour over 1 hour for a single host. +// Currently only one host is supported +func (d *Devops) MaxAllCPU(qi query.Query, nHosts int) { + interval := d.Interval.MustRandWindow(devops.MaxAllDuration) + hostnames, err := d.GetRandomHosts(nHosts) + panicIfErr(err) + redisQuery := [][]byte{ + //[]byte("TS.MRANGE"), Just to help understanding + []byte(fmt.Sprintf("%d", interval.StartUnixMillis())), + []byte(fmt.Sprintf("%d", interval.EndUnixMillis())), + []byte("AGGREGATION"), + []byte("MAX"), + []byte(fmt.Sprintf("%d", oneHourMillis)), + []byte("FILTER"), + []byte("measurement=cpu"), + } + + redisArg := "hostname=" + if nHosts > 1 { + redisArg += "(" + } + for idx, value := range hostnames { + redisArg += value + if idx != (nHosts - 1) { + redisArg += "," + } + } + if nHosts > 1 { + redisArg += ")" + } + redisQuery = append(redisQuery, []byte(redisArg)) + if nHosts > 1 { + redisQuery = append(redisQuery, []byte("GROUPBY"), []byte("fieldname"), []byte("REDUCE"), []byte("max")) + } + humanLabel := devops.GetMaxAllLabel("RedisTimeSeries", nHosts) + humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString()) + d.fillInQueryStrings(qi, humanLabel, humanDesc) + + d.AddQuery(qi, redisQuery, []byte("TS.MRANGE")) +} + +// LastPointPerHost finds the last row for every host in the dataset +func (d *Devops) LastPointPerHost(qi query.Query) { + redisQuery := [][]byte{ + []byte("SELECTED_LABELS"), + []byte("hostname"), + []byte("fieldname"), + []byte("FILTER"), + []byte("measurement=cpu"), + []byte("hostname!="), + } + + humanLabel := "RedisTimeSeries last row per host" + humanDesc := fmt.Sprintf("%s", humanLabel) + d.fillInQueryStrings(qi, humanLabel, humanDesc) + d.AddQuery(qi, redisQuery, []byte("TS.MGET")) +} + +// HighCPUForHosts populates a query that gets CPU metrics when the CPU has high +// usage between a time period for a number of hosts (if 0, it will search all hosts), +// e.g. in pseudo-SQL: +// +// SELECT * FROM cpu +// WHERE usage_user > 90.0 +// AND time >= '$TIME_START' AND time < '$TIME_END' +// AND (hostname = '$HOST' OR hostname = '$HOST2'...) +// +// Resultsets: +// high-cpu-1 +// high-cpu-all +func (d *Devops) HighCPUForHosts(qi query.Query, nHosts int) { + hostnames, err := d.GetRandomHosts(nHosts) + interval := d.Interval.MustRandWindow(devops.HighCPUDuration) + redisQuery := [][]byte{ + //[]byte("TS.MRANGE"), Just to help understanding + []byte(fmt.Sprintf("%d", interval.StartUnixMillis())), + []byte(fmt.Sprintf("%d", interval.EndUnixMillis())), + []byte("FILTER_BY_VALUE"), []byte("90.0"), []byte("1000"), + []byte("FILTER"), + []byte("fieldname=usage_user"), + []byte("measurement=cpu"), + } + if nHosts > 0 { + redisArg := "hostname=" + if nHosts > 1 { + redisArg += "(" + } + for idx, value := range hostnames { + redisArg += value + if idx != (nHosts - 1) { + redisArg += "," + } + } + if nHosts > 1 { + redisArg += ")" + } + redisQuery = append(redisQuery, []byte(redisArg)) + } + redisQuery = append(redisQuery, []byte("GROUPBY"), []byte("fieldname"), []byte("REDUCE"), []byte("max")) + + humanLabel, err := devops.GetHighCPULabel("RedisTimeSeries", nHosts) + panicIfErr(err) + humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString()) + d.fillInQueryStrings(qi, humanLabel, humanDesc) + d.AddQuery(qi, redisQuery, []byte("TS.MRANGE")) + d.SetApplyFunctor(qi, true, "FILTER_BY_TS") +} + +// GroupByOrderByLimit populates a query.Query that has a time WHERE clause, that groups by a truncated date, orders by that date, and takes a limit: +func (d *Devops) GroupByOrderByLimit(qi query.Query) { + + interval := d.Interval.MustRandWindow(time.Hour) + redisQuery := [][]byte{ + //[]byte("TS.MREVRANGE"), Just to help understanding + []byte("-"), + []byte(fmt.Sprintf("%d", interval.EndUnixMillis())), + []byte("COUNT"), + []byte("5"), + []byte("AGGREGATION"), + []byte("MAX"), + []byte(fmt.Sprintf("%d", oneMinuteMillis)), + []byte("FILTER"), + []byte("measurement=cpu"), + []byte("fieldname=usage_user"), + } + + humanLabel := devops.GetGroupByOrderByLimitLabel("RedisTimeSeries") + humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.EndString()) + + d.fillInQueryStrings(qi, humanLabel, humanDesc) + d.AddQuery(qi, redisQuery, []byte("TS.MREVRANGE")) + +} diff --git a/cmd/tsbs_generate_queries/databases/victoriametrics/devops.go b/cmd/tsbs_generate_queries/databases/victoriametrics/devops.go index 16551b1aa..f6b571547 100644 --- a/cmd/tsbs_generate_queries/databases/victoriametrics/devops.go +++ b/cmd/tsbs_generate_queries/databases/victoriametrics/devops.go @@ -91,13 +91,13 @@ func (d *Devops) GroupByTimeAndPrimaryTag(qq query.Query, numMetrics int) { // {hostname=~"hostname1|hostname2...|hostnameN"}[1h] // ) // ) by (__name__) -func (d *Devops) MaxAllCPU(qq query.Query, nHosts int) { +func (d *Devops) MaxAllCPU(qq query.Query, nHosts int, duration time.Duration) { hosts := d.mustGetRandomHosts(nHosts) selectClause := getSelectClause(devops.GetAllCPUMetrics(), hosts) qi := &queryInfo{ query: fmt.Sprintf("max(max_over_time(%s[1h])) by (__name__)", selectClause), label: devops.GetMaxAllLabel("VictoriaMetrics", nHosts), - interval: d.Interval.MustRandWindow(devops.MaxAllDuration), + interval: d.Interval.MustRandWindow(duration), step: "3600", } d.fillInQuery(qq, qi) diff --git a/cmd/tsbs_generate_queries/databases/victoriametrics/devops_test.go b/cmd/tsbs_generate_queries/databases/victoriametrics/devops_test.go index 7496ddadc..809102375 100644 --- a/cmd/tsbs_generate_queries/databases/victoriametrics/devops_test.go +++ b/cmd/tsbs_generate_queries/databases/victoriametrics/devops_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/timescale/tsbs/cmd/tsbs_generate_queries/uses/devops" "github.com/timescale/tsbs/pkg/query" ) @@ -47,7 +48,7 @@ func Test_what(t *testing.T) { }, "MaxAllCPU": { fn: func(g *Devops, q *query.HTTP) { - g.MaxAllCPU(q, 5) + g.MaxAllCPU(q, 5, devops.MaxAllDuration) }, expQuery: "max(max_over_time({__name__=~'cpu_(usage_user|usage_system|usage_idle|usage_nice|usage_iowait|usage_irq|usage_softirq|usage_steal|usage_guest|usage_guest_nice)', hostname=~'host_5|host_9|host_3|host_1|host_7'}[1h])) by (__name__)", expStep: "3600", diff --git a/cmd/tsbs_generate_queries/uses/devops/common.go b/cmd/tsbs_generate_queries/uses/devops/common.go index f987eec34..42511b07e 100644 --- a/cmd/tsbs_generate_queries/uses/devops/common.go +++ b/cmd/tsbs_generate_queries/uses/devops/common.go @@ -120,6 +120,12 @@ type HighCPUFiller interface { HighCPUForHosts(query.Query, int) } +// GetSingleGroupByLabel returns the Query human-readable label for DoubleGroupBy queries +func GetSingleGroupByLabel(dbName string, numMetrics, nHosts int, timerange string) string { + return fmt.Sprintf("%s MAX of %d metrics,random %4d hosts, random %s by 1m", dbName, numMetrics, nHosts, timerange) + +} + // GetDoubleGroupByLabel returns the Query human-readable label for DoubleGroupBy queries func GetDoubleGroupByLabel(dbName string, numMetrics int) string { return fmt.Sprintf("%s mean of %d metrics, all hosts, random %s by 1h", dbName, numMetrics, DoubleGroupByDuration) @@ -143,6 +149,11 @@ func GetMaxAllLabel(dbName string, nHosts int) string { return fmt.Sprintf("%s max of all CPU metrics, random %4d hosts, random %s by 1h", dbName, nHosts, MaxAllDuration) } +// GetMaxAllLabel returns the Query human-readable label for MaxAllCPU queries +func GetGroupByOrderByLimitLabel(dbName string) string { + return fmt.Sprintf("%s max cpu over last 5 min-intervals (random end)", dbName) +} + // getRandomHosts returns a subset of numHosts hostnames of a permutation of hostnames, // numbered from 0 to totalHosts. // Ex.: host_12, host_7, host_25 for numHosts=3 and totalHosts=30 (3 out of 30) diff --git a/cmd/tsbs_load_influx/creator.go b/cmd/tsbs_load_influx/creator.go index 28fc9a6bb..1f7f8d12c 100644 --- a/cmd/tsbs_load_influx/creator.go +++ b/cmd/tsbs_load_influx/creator.go @@ -33,8 +33,16 @@ func (d *dbCreator) DBExists(dbName string) bool { } func (d *dbCreator) listDatabases() ([]string, error) { + client := http.Client{} u := fmt.Sprintf("%s/query?q=show%%20databases", d.daemonURL) - resp, err := http.Get(u) + req, err := http.NewRequest("GET", u, nil) + if authToken != "" { + req.Header = http.Header{ + headerAuthorization: []string{fmt.Sprintf("Token %s", authToken)}, + } + } + resp, err := client.Do(req) + if err != nil { return nil, fmt.Errorf("listDatabases error: %s", err.Error()) } @@ -61,20 +69,30 @@ func (d *dbCreator) listDatabases() ([]string, error) { } ret := []string{} - for _, nestedName := range listing.Results[0].Series[0].Values { - name := nestedName[0] - // the _internal database is skipped: - if name == "_internal" { - continue + if len(listing.Results) > 0 { + for _, nestedName := range listing.Results[0].Series[0].Values { + name := nestedName[0] + // the _internal database is skipped: + if name == "_internal" { + continue + } + ret = append(ret, name) } - ret = append(ret, name) } return ret, nil } func (d *dbCreator) RemoveOldDB(dbName string) error { u := fmt.Sprintf("%s/query?q=drop+database+%s", d.daemonURL, dbName) - resp, err := http.Post(u, "text/plain", nil) + client := http.Client{} + req, err := http.NewRequest("POST", u, nil) + if authToken != "" { + req.Header = http.Header{ + "Content-Type": []string{"text/plain"}, + headerAuthorization: []string{fmt.Sprintf("Token %s", authToken)}, + } + } + resp, err := client.Do(req) if err != nil { return fmt.Errorf("drop db error: %s", err.Error()) } @@ -99,6 +117,11 @@ func (d *dbCreator) CreateDB(dbName string) error { u.RawQuery = v.Encode() req, err := http.NewRequest("GET", u.String(), nil) + if authToken != "" { + req.Header = http.Header{ + headerAuthorization: []string{fmt.Sprintf("Token %s", authToken)}, + } + } if err != nil { return err } diff --git a/cmd/tsbs_load_influx/http_writer.go b/cmd/tsbs_load_influx/http_writer.go index b56ae2d8e..a53ce989f 100644 --- a/cmd/tsbs_load_influx/http_writer.go +++ b/cmd/tsbs_load_influx/http_writer.go @@ -14,6 +14,7 @@ import ( const ( httpClientName = "tsbs_load_influx" headerContentEncoding = "Content-Encoding" + headerAuthorization = "Authorization" headerGzip = "gzip" ) @@ -65,13 +66,16 @@ var ( textPlain = []byte("text/plain") ) -func (w *HTTPWriter) initializeReq(req *fasthttp.Request, body []byte, isGzip bool) { +func (w *HTTPWriter) initializeReq(req *fasthttp.Request, body []byte, isGzip bool, authToken string) { req.Header.SetContentTypeBytes(textPlain) req.Header.SetMethodBytes(methodPost) req.Header.SetRequestURIBytes(w.url) if isGzip { req.Header.Add(headerContentEncoding, headerGzip) } + if authToken != "" { + req.Header.Add(headerAuthorization, fmt.Sprintf("Token %s", authToken)) + } req.SetBody(body) } @@ -96,7 +100,7 @@ func (w *HTTPWriter) executeReq(req *fasthttp.Request, resp *fasthttp.Response) func (w *HTTPWriter) WriteLineProtocol(body []byte, isGzip bool) (int64, error) { req := fasthttp.AcquireRequest() defer fasthttp.ReleaseRequest(req) - w.initializeReq(req, body, isGzip) + w.initializeReq(req, body, isGzip, authToken) resp := fasthttp.AcquireResponse() defer fasthttp.ReleaseResponse(resp) diff --git a/cmd/tsbs_load_influx/http_writer_test.go b/cmd/tsbs_load_influx/http_writer_test.go index 170ae4ea3..ba27656c2 100644 --- a/cmd/tsbs_load_influx/http_writer_test.go +++ b/cmd/tsbs_load_influx/http_writer_test.go @@ -114,7 +114,7 @@ func TestHTTPWriterInitializeReq(t *testing.T) { defer fasthttp.ReleaseRequest(req) w := NewHTTPWriter(testConf, testConsistency) body := "this is a test body" - w.initializeReq(req, []byte(body), false) + w.initializeReq(req, []byte(body), false, "") if got := string(req.Body()); got != body { t.Errorf("non-gzip: body not correct: got '%s' want '%s'", got, body) @@ -129,7 +129,7 @@ func TestHTTPWriterInitializeReq(t *testing.T) { t.Errorf("non-gzip: Content-Encoding is not empty: got %s", got) } - w.initializeReq(req, []byte(body), true) + w.initializeReq(req, []byte(body), true, "") if got := string(req.Header.Peek(headerContentEncoding)); got != headerGzip { t.Errorf("gzip: Content-Encoding is not correct: got %s want %s", got, headerGzip) } @@ -144,7 +144,7 @@ func TestHTTPWriterExecuteReq(t *testing.T) { w := NewHTTPWriter(testConf, testConsistency) body := "this is a test body" normalURL := w.url // save for later modification - w.initializeReq(req, []byte(body), false) + w.initializeReq(req, []byte(body), false, "") resp := fasthttp.AcquireResponse() defer fasthttp.ReleaseResponse(resp) lat, err := w.executeReq(req, resp) @@ -161,7 +161,7 @@ func TestHTTPWriterExecuteReq(t *testing.T) { w.url = []byte(fmt.Sprintf("%s&%s=true", string(normalURL), shouldBackoffParam)) req = fasthttp.AcquireRequest() defer fasthttp.ReleaseRequest(req) - w.initializeReq(req, []byte(body), false) + w.initializeReq(req, []byte(body), false, "") lat, err = w.executeReq(req, resp) if err != errBackoff { t.Errorf("unexpected error response received (not backoff error): %v", err) @@ -176,7 +176,7 @@ func TestHTTPWriterExecuteReq(t *testing.T) { w.url = []byte(fmt.Sprintf("%s&%s=true", string(normalURL), shouldInvalidParam)) req = fasthttp.AcquireRequest() defer fasthttp.ReleaseRequest(req) - w.initializeReq(req, []byte(body), false) + w.initializeReq(req, []byte(body), false, "") lat, err = w.executeReq(req, resp) if err == nil { t.Errorf("unexpected non-error response received") diff --git a/cmd/tsbs_load_influx/main.go b/cmd/tsbs_load_influx/main.go index f6b85f1b5..268ffa95c 100644 --- a/cmd/tsbs_load_influx/main.go +++ b/cmd/tsbs_load_influx/main.go @@ -30,6 +30,9 @@ var ( useGzip bool doAbortOnExist bool consistency string + authToken string // InfluxDB v2 + bucketId string // InfluxDB v2 + orgId string // InfluxDB v2 ) // Global vars @@ -73,6 +76,8 @@ func init() { csvDaemonURLs = viper.GetString("urls") replicationFactor = viper.GetInt("replication-factor") consistency = viper.GetString("consistency") + authToken = viper.GetString("auth-token") + orgId = viper.GetString("org") backoff = viper.GetDuration("backoff") useGzip = viper.GetBool("gzip") @@ -80,6 +85,12 @@ func init() { log.Fatalf("invalid consistency settings") } + if authToken != "" { + log.Println("Using Authorization header in benchmark") + } else { + log.Println("Given no Authorization header was provided will not send it in benchmark") + } + daemonURLs = strings.Split(csvDaemonURLs, ",") if len(daemonURLs) == 0 { log.Fatal("missing 'urls' flag") diff --git a/cmd/tsbs_load_questdb/creator.go b/cmd/tsbs_load_questdb/creator.go new file mode 100644 index 000000000..fc0df7634 --- /dev/null +++ b/cmd/tsbs_load_questdb/creator.go @@ -0,0 +1,86 @@ +package main + +import ( + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "strings" + "time" +) + +type dbCreator struct { + questdbRESTEndPoint string +} + +func (d *dbCreator) Init() { + d.questdbRESTEndPoint = questdbRESTEndPoint +} + +func (d *dbCreator) DBExists(dbName string) bool { + r, err := execQuery(questdbRESTEndPoint, "SHOW TABLES") + if err != nil { + panic(fmt.Errorf("fatal error, failed to query questdb: %s", err)) + } + for i, v := range r.Dataset { + if i >= 0 && v[0] == "cpu" { + panic(fmt.Errorf("fatal error, cpu table already exists")) + } + } + // Create minimal table with o3 params + // r, err = execQuery(questdbRESTEndPoint, "CREATE TABLE cpu (hostname SYMBOL, region SYMBOL, datacenter SYMBOL, rack SYMBOL, os SYMBOL, arch SYMBOL, team SYMBOL, service SYMBOL, service_version SYMBOL, service_environment SYMBOL, usage_user LONG, usage_system LONG, usage_idle LONG, usage_nice LONG, usage_iowait LONG, usage_irq LONG, usage_softirq LONG, usage_steal LONG, usage_guest LONG, usage_guest_nice LONG, timestamp TIMESTAMP) timestamp(timestamp) PARTITION BY DAY WITH o3MaxUncommittedRows=500000, o3CommitHysteresis=300s") + // if err != nil { + // panic(fmt.Errorf("fatal error, failed to create cpu table: %s", err)) + // } + + return false +} + +func (d *dbCreator) RemoveOldDB(dbName string) error { + return nil +} + +func (d *dbCreator) CreateDB(dbName string) error { + time.Sleep(time.Second) + return nil +} + +type QueryResponseColumns struct { + Name string + Type string +} + +type QueryResponse struct { + Query string + Columns []QueryResponseColumns + Dataset [][]interface{} + Count int + Error string +} + +func execQuery(uriRoot string, query string) (QueryResponse, error) { + var qr QueryResponse + if strings.HasSuffix(uriRoot, "/") { + uriRoot = uriRoot[:len(uriRoot)-1] + } + uriRoot = uriRoot + "/exec?query=" + url.QueryEscape(query) + resp, err := http.Get(uriRoot) + if err != nil { + return qr, err + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return qr, err + } + err = json.Unmarshal(body, &qr) + if err != nil { + return qr, err + } + if qr.Error != "" { + return qr, errors.New(qr.Error) + } + return qr, nil +} diff --git a/cmd/tsbs_load_questdb/main.go b/cmd/tsbs_load_questdb/main.go new file mode 100644 index 000000000..852700975 --- /dev/null +++ b/cmd/tsbs_load_questdb/main.go @@ -0,0 +1,106 @@ +// bulk_load_questdb loads an QuestDB daemon with data from stdin. +// +// The caller is responsible for assuring that the database is empty before +// bulk load. +package main + +import ( + "bufio" + "bytes" + "fmt" + "log" + "sync" + "time" + + "github.com/blagojts/viper" + "github.com/spf13/pflag" + "github.com/timescale/tsbs/internal/utils" + "github.com/timescale/tsbs/load" + "github.com/timescale/tsbs/pkg/targets" + "github.com/timescale/tsbs/pkg/targets/constants" + "github.com/timescale/tsbs/pkg/targets/initializers" +) + +// Program option vars: +var ( + questdbRESTEndPoint string + questdbILPBindTo string + doAbortOnExist bool +) + +// Global vars +var ( + loader load.BenchmarkRunner + config load.BenchmarkRunnerConfig + bufPool sync.Pool + target targets.ImplementedTarget +) + +// allows for testing +var fatal = log.Fatalf + +// Parse args: +func init() { + target = initializers.GetTarget(constants.FormatQuestDB) + config = load.BenchmarkRunnerConfig{} + // Not all the default flags apply to QuestDB + // config.AddToFlagSet(pflag.CommandLine) + pflag.CommandLine.Uint("batch-size", 10000, "Number of items to batch together in a single insert") + pflag.CommandLine.Uint("workers", 1, "Number of parallel clients inserting") + pflag.CommandLine.Uint64("limit", 0, "Number of items to insert (0 = all of them).") + pflag.CommandLine.Bool("do-load", true, "Whether to write data. Set this flag to false to check input read speed.") + pflag.CommandLine.Duration("reporting-period", 10*time.Second, "Period to report write stats") + pflag.CommandLine.String("file", "", "File name to read data from") + pflag.CommandLine.Int64("seed", 0, "PRNG seed (default: 0, which uses the current timestamp)") + pflag.CommandLine.String("insert-intervals", "", "Time to wait between each insert, default '' => all workers insert ASAP. '1,2' = worker 1 waits 1s between inserts, worker 2 and others wait 2s") + pflag.CommandLine.Bool("hash-workers", false, "Whether to consistently hash insert data to the same workers (i.e., the data for a particular host always goes to the same worker)") + target.TargetSpecificFlags("", pflag.CommandLine) + pflag.Parse() + + err := utils.SetupConfigFile() + + if err != nil { + panic(fmt.Errorf("fatal error config file: %s", err)) + } + + if err := viper.Unmarshal(&config); err != nil { + panic(fmt.Errorf("unable to decode config: %s", err)) + } + + questdbRESTEndPoint = viper.GetString("url") + questdbILPBindTo = viper.GetString("ilp-bind-to") + config.HashWorkers = false + loader = load.GetBenchmarkRunner(config) +} + +type benchmark struct{} + +func (b *benchmark) GetDataSource() targets.DataSource { + return &fileDataSource{scanner: bufio.NewScanner(load.GetBufferedReader(config.FileName))} +} + +func (b *benchmark) GetBatchFactory() targets.BatchFactory { + return &factory{} +} + +func (b *benchmark) GetPointIndexer(_ uint) targets.PointIndexer { + return &targets.ConstantIndexer{} +} + +func (b *benchmark) GetProcessor() targets.Processor { + return &processor{} +} + +func (b *benchmark) GetDBCreator() targets.DBCreator { + return &dbCreator{} +} + +func main() { + bufPool = sync.Pool{ + New: func() interface{} { + return bytes.NewBuffer(make([]byte, 0, 4*1024*1024)) + }, + } + + loader.RunBenchmark(&benchmark{}) +} diff --git a/cmd/tsbs_load_questdb/process.go b/cmd/tsbs_load_questdb/process.go new file mode 100644 index 000000000..c7fe24f40 --- /dev/null +++ b/cmd/tsbs_load_questdb/process.go @@ -0,0 +1,51 @@ +package main + +import ( + "fmt" + "net" + + "github.com/timescale/tsbs/pkg/targets" +) + +// allows for testing +var printFn = fmt.Printf + +type processor struct { + ilpConn (*net.TCPConn) +} + +func (p *processor) Init(numWorker int, _, _ bool) { + tcpAddr, err := net.ResolveTCPAddr("tcp4", questdbILPBindTo) + if err != nil { + fatal("Failed to resolve %s: %s\n", questdbILPBindTo, err.Error()) + } + p.ilpConn, err = net.DialTCP("tcp", nil, tcpAddr) + if err != nil { + fatal("Failed connect to %s: %s\n", questdbILPBindTo, err.Error()) + } +} + +func (p *processor) Close(_ bool) { + defer p.ilpConn.Close() +} + +func (p *processor) ProcessBatch(b targets.Batch, doLoad bool) (uint64, uint64) { + batch := b.(*batch) + + // Write the batch: try until backoff is not needed. + if doLoad { + var err error + _, err = p.ilpConn.Write(batch.buf.Bytes()) + if err != nil { + fatal("Error writing: %s\n", err.Error()) + } + } + + metricCnt := batch.metrics + rowCnt := batch.rows + + // Return the batch buffer to the pool. + batch.buf.Reset() + bufPool.Put(batch.buf) + return metricCnt, uint64(rowCnt) +} diff --git a/cmd/tsbs_load_questdb/process_test.go b/cmd/tsbs_load_questdb/process_test.go new file mode 100644 index 000000000..16c9d4833 --- /dev/null +++ b/cmd/tsbs_load_questdb/process_test.go @@ -0,0 +1,124 @@ +package main + +import ( + "bytes" + "fmt" + "io" + "net" + "sync" + "testing" + "time" + + "github.com/timescale/tsbs/pkg/data" +) + +func emptyLog(_ string, _ ...interface{}) (int, error) { + return 0, nil +} + +type mockServer struct { + ln net.Listener + listenPort int +} + +func mockServerStop(ms *mockServer) { + ms.ln.Close() +} + +func mockServerStart() *mockServer { + ln, err := net.Listen("tcp", ":0") + if err != nil { + fatal("Failed to start server listen socket: %s\n", err.Error()) + } + fmt.Println("Mock TCP server listening on port:", ln.Addr().(*net.TCPAddr).Port) + ms := &mockServer{ + ln: ln, + listenPort: ln.Addr().(*net.TCPAddr).Port, + } + go func() { + for { + + conn, err := ln.Accept() + if err != nil { + // listen socket is closed + return + } + go func() { + data := make([]byte, 512) + for { + rc, err := conn.Read(data) + if err != nil { + if err != io.EOF { + fatal("failed to read from connection: ", err.Error()) + } + return + } + fmt.Println(conn, " read ", rc) + } + }() + } + }() + return ms +} + +func TestProcessorInit(t *testing.T) { + ms := mockServerStart() + defer mockServerStop(ms) + questdbILPBindTo = fmt.Sprintf("127.0.0.1:%d", ms.listenPort) + printFn = emptyLog + p := &processor{} + p.Init(0, false, false) + p.Close(true) + + p = &processor{} + p.Init(1, false, false) + p.Close(true) +} + +func TestProcessorProcessBatch(t *testing.T) { + bufPool = sync.Pool{ + New: func() interface{} { + return bytes.NewBuffer(make([]byte, 0, 4*1024*1024)) + }, + } + f := &factory{} + b := f.New().(*batch) + pt := data.LoadedPoint{ + Data: []byte("tag1=tag1val,tag2=tag2val col1=0.0,col2=0.0 140\n"), + } + b.Append(pt) + + cases := []struct { + doLoad bool + }{ + { + doLoad: false, + }, + { + doLoad: true, + }, + } + + for _, c := range cases { + fatal = func(format string, args ...interface{}) { + t.Errorf("fatal called for case %v unexpectedly\n", c) + fmt.Printf(format, args...) + } + + ms := mockServerStart() + questdbILPBindTo = fmt.Sprintf("127.0.0.1:%d", ms.listenPort) + + p := &processor{} + p.Init(0, true, true) + mCnt, rCnt := p.ProcessBatch(b, c.doLoad) + if mCnt != b.metrics { + t.Errorf("process batch returned less metrics than batch: got %d want %d", mCnt, b.metrics) + } + if rCnt != uint64(b.rows) { + t.Errorf("process batch returned less rows than batch: got %d want %d", rCnt, b.rows) + } + p.Close(true) + mockServerStop(ms) + time.Sleep(50 * time.Millisecond) + } +} diff --git a/cmd/tsbs_load_questdb/scan.go b/cmd/tsbs_load_questdb/scan.go new file mode 100644 index 000000000..c4605d7aa --- /dev/null +++ b/cmd/tsbs_load_questdb/scan.go @@ -0,0 +1,65 @@ +package main + +import ( + "bufio" + "bytes" + "strings" + + "github.com/timescale/tsbs/pkg/data" + "github.com/timescale/tsbs/pkg/data/usecases/common" + "github.com/timescale/tsbs/pkg/targets" +) + +const errNotThreeTuplesFmt = "parse error: line does not have 3 tuples, has %d" + +var newLine = []byte("\n") + +type fileDataSource struct { + scanner *bufio.Scanner +} + +func (d *fileDataSource) NextItem() data.LoadedPoint { + ok := d.scanner.Scan() + if !ok && d.scanner.Err() == nil { // nothing scanned & no error = EOF + return data.LoadedPoint{} + } else if !ok { + fatal("scan error: %v", d.scanner.Err()) + return data.LoadedPoint{} + } + return data.NewLoadedPoint(d.scanner.Bytes()) +} + +func (d *fileDataSource) Headers() *common.GeneratedDataHeaders { return nil } + +type batch struct { + buf *bytes.Buffer + rows uint + metrics uint64 +} + +func (b *batch) Len() uint { + return b.rows +} + +func (b *batch) Append(item data.LoadedPoint) { + that := item.Data.([]byte) + thatStr := string(that) + b.rows++ + // Each influx line is format "csv-tags csv-fields timestamp", so we split by space + // and then on the middle element, we split by comma to count number of fields added + args := strings.Split(thatStr, " ") + if len(args) != 3 { + fatal(errNotThreeTuplesFmt, len(args)) + return + } + b.metrics += uint64(len(strings.Split(args[1], ","))) + + b.buf.Write(that) + b.buf.Write(newLine) +} + +type factory struct{} + +func (f *factory) New() targets.Batch { + return &batch{buf: bufPool.Get().(*bytes.Buffer)} +} diff --git a/cmd/tsbs_load_questdb/scan_test.go b/cmd/tsbs_load_questdb/scan_test.go new file mode 100644 index 000000000..a4144946f --- /dev/null +++ b/cmd/tsbs_load_questdb/scan_test.go @@ -0,0 +1,105 @@ +package main + +import ( + "bufio" + "bytes" + "fmt" + "sync" + "testing" + + "github.com/timescale/tsbs/pkg/data" +) + +func TestBatch(t *testing.T) { + bufPool = sync.Pool{ + New: func() interface{} { + return bytes.NewBuffer(make([]byte, 0, 4*1024*1024)) + }, + } + f := &factory{} + b := f.New().(*batch) + if b.Len() != 0 { + t.Errorf("batch not initialized with count 0") + } + p := data.LoadedPoint{ + Data: []byte("tag1=tag1val,tag2=tag2val col1=0.0,col2=0.0 140"), + } + b.Append(p) + if b.Len() != 1 { + t.Errorf("batch count is not 1 after first append") + } + if b.rows != 1 { + t.Errorf("batch row count is not 1 after first append") + } + if b.metrics != 2 { + t.Errorf("batch metric count is not 2 after first append") + } + + p = data.LoadedPoint{ + Data: []byte("tag1=tag1val,tag2=tag2val col1=1.0,col2=1.0 190"), + } + b.Append(p) + if b.Len() != 2 { + t.Errorf("batch count is not 1 after first append") + } + if b.rows != 2 { + t.Errorf("batch row count is not 1 after first append") + } + if b.metrics != 4 { + t.Errorf("batch metric count is not 2 after first append") + } + + p = data.LoadedPoint{ + Data: []byte("bad_point"), + } + errMsg := "" + fatal = func(f string, args ...interface{}) { + errMsg = fmt.Sprintf(f, args...) + } + b.Append(p) + if errMsg == "" { + t.Errorf("batch append did not error with ill-formed point") + } +} + +func TestFileDataSourceNextItem(t *testing.T) { + cases := []struct { + desc string + input string + result []byte + shouldFatal bool + }{ + { + desc: "correct input", + input: "cpu,tag1=tag1text,tag2=tag2text col1=0.0,col2=0.0 140\n", + result: []byte("cpu,tag1=tag1text,tag2=tag2text col1=0.0,col2=0.0 140"), + }, + { + desc: "correct input with extra", + input: "cpu,tag1=tag1text,tag2=tag2text col1=0.0,col2=0.0 140\nextra_is_ignored", + result: []byte("cpu,tag1=tag1text,tag2=tag2text col1=0.0,col2=0.0 140"), + }, + } + + for _, c := range cases { + br := bufio.NewReader(bytes.NewReader([]byte(c.input))) + ds := &fileDataSource{scanner: bufio.NewScanner(br)} + p := ds.NextItem() + data := p.Data.([]byte) + if !bytes.Equal(data, c.result) { + t.Errorf("%s: incorrect result: got\n%v\nwant\n%v", c.desc, data, c.result) + } + } +} + +func TestDecodeEOF(t *testing.T) { + input := []byte("cpu,tag1=tag1text,tag2=tag2text col1=0.0,col2=0.0 140") + br := bufio.NewReader(bytes.NewReader([]byte(input))) + ds := &fileDataSource{scanner: bufio.NewScanner(br)} + _ = ds.NextItem() + // nothing left, should be EOF + p := ds.NextItem() + if p.Data != nil { + t.Errorf("expected p to be nil, got %v", p) + } +} diff --git a/cmd/tsbs_load_redistimeseries/benchmark.go b/cmd/tsbs_load_redistimeseries/benchmark.go new file mode 100644 index 000000000..fb70fe0fe --- /dev/null +++ b/cmd/tsbs_load_redistimeseries/benchmark.go @@ -0,0 +1,33 @@ +package main + +import ( + "bufio" + "github.com/timescale/tsbs/load" + "github.com/timescale/tsbs/pkg/targets" + "log" +) + +func (b *benchmark) GetBatchFactory() targets.BatchFactory { + return &factory{} +} + +func (b *benchmark) GetPointIndexer(maxPartitions uint) targets.PointIndexer { + return &RedisIndexer{partitions: maxPartitions} +} + +func (b *benchmark) GetProcessor() targets.Processor { + return &processor{b.dbc, nil, nil, nil} +} + +func (b *benchmark) GetDBCreator() targets.DBCreator { + return b.dbc +} + +type benchmark struct { + dbc *dbCreator +} + +func (b *benchmark) GetDataSource() targets.DataSource { + log.Printf("creating DS from %s", config.FileName) + return &fileDataSource{scanner: bufio.NewScanner(load.GetBufferedReader(config.FileName))} +} diff --git a/cmd/tsbs_load_redistimeseries/cluster_conn.go b/cmd/tsbs_load_redistimeseries/cluster_conn.go new file mode 100644 index 000000000..e19a46643 --- /dev/null +++ b/cmd/tsbs_load_redistimeseries/cluster_conn.go @@ -0,0 +1,105 @@ +package main + +import ( + "github.com/mediocregopher/radix/v3" + "github.com/timescale/tsbs/pkg/data" + "log" + "strconv" + "strings" + "sync" +) + +func getOSSClusterConn(addr string, opts []radix.DialOpt, clients uint64) *radix.Cluster { + var vanillaCluster *radix.Cluster + var err error + + customConnFunc := func(network, addr string) (radix.Conn, error) { + return radix.Dial(network, addr, opts..., + ) + } + + // this cluster will use the ClientFunc to create a pool to each node in the + // cluster. + poolFunc := func(network, addr string) (radix.Client, error) { + return radix.NewPool(network, addr, int(clients), radix.PoolConnFunc(customConnFunc), radix.PoolPipelineWindow(0, 0)) + } + + vanillaCluster, err = radix.NewCluster([]string{addr}, radix.ClusterPoolFunc(poolFunc)) + if err != nil { + log.Fatalf("Error preparing for benchmark, while creating new connection. error = %v", err) + } + // Issue CLUSTER SLOTS command + err = vanillaCluster.Sync() + if err != nil { + log.Fatalf("Error preparing for benchmark, while issuing CLUSTER SLOTS. error = %v", err) + } + return vanillaCluster +} + +func nodeThatContainsSlot(slots [][][2]uint16, slot int) (result int) { + result = -1 + for nodePos, slotGroup := range slots { + for _, i2 := range slotGroup { + if slot >= int(i2[0]) && slot < int(i2[1]) { + result = nodePos + return + } + } + } + return +} + +func connectionProcessorCluster(wg *sync.WaitGroup, compressionType string, rows chan string, metrics chan uint64, cluster *radix.Cluster, clusterNodes int, addresses []string, slots [][][2]uint16, conns []radix.Client) { + cmds := make([][]radix.CmdAction, clusterNodes, clusterNodes) + curPipe := make([]uint64, clusterNodes, clusterNodes) + currMetricCount := make([]int, clusterNodes, clusterNodes) + for i := 0; i < clusterNodes; i++ { + cmds[i] = make([]radix.CmdAction, 0, 0) + curPipe[i] = 0 + currMetricCount[i] = 0 + } + + for row := range rows { + slot, cmd, _, metricCount := buildCommand(row, compressionType) + comdPos := nodeThatContainsSlot(slots, slot) + var err error = nil + + currMetricCount[comdPos] += metricCount + cmds[comdPos] = append(cmds[comdPos], cmd) + curPipe[comdPos]++ + + if curPipe[comdPos] == pipeline { + err = conns[comdPos].Do(radix.Pipeline(cmds[comdPos]...)) + if err != nil { + log.Fatalf("Flush failed with %v", err) + } + metrics <- uint64(currMetricCount[comdPos]) + currMetricCount[comdPos] = 0 + cmds[comdPos] = make([]radix.CmdAction, 0, 0) + curPipe[comdPos] = 0 + } + + } + for comdPos, u := range curPipe { + if u > 0 { + var err error = nil + err = conns[comdPos].Do(radix.Pipeline(cmds[comdPos]...)) + if err != nil { + log.Fatalf("Flush failed with %v", err) + } + metrics <- uint64(currMetricCount[comdPos]) + } + } + wg.Done() +} + +type RedisIndexer struct { + partitions uint +} + +func (i *RedisIndexer) GetIndex(p data.LoadedPoint) uint { + row := p.Data.(string) + slotS := strings.Split(row, " ")[0] + clusterSlot, _ := strconv.ParseInt(slotS, 10, 0) + return uint(clusterSlot) % i.partitions +} diff --git a/cmd/tsbs_load_redistimeseries/creator.go b/cmd/tsbs_load_redistimeseries/creator.go new file mode 100644 index 000000000..9e20ca0f1 --- /dev/null +++ b/cmd/tsbs_load_redistimeseries/creator.go @@ -0,0 +1,23 @@ +package main + +type dbCreator struct { +} + +func (d *dbCreator) Init() { +} + +func (d *dbCreator) DBExists(dbName string) bool { + return true +} + +func (d *dbCreator) RemoveOldDB(dbName string) error { + return nil +} + +func (d *dbCreator) CreateDB(dbName string) error { + return nil +} + +func (d *dbCreator) Close() { + +} diff --git a/cmd/tsbs_load_redistimeseries/file_datasource.go b/cmd/tsbs_load_redistimeseries/file_datasource.go new file mode 100644 index 000000000..4ad3fd15b --- /dev/null +++ b/cmd/tsbs_load_redistimeseries/file_datasource.go @@ -0,0 +1,24 @@ +package main + +import ( + "bufio" + "github.com/timescale/tsbs/pkg/data" + "github.com/timescale/tsbs/pkg/data/usecases/common" +) + +type fileDataSource struct { + scanner *bufio.Scanner +} + +func (d *fileDataSource) NextItem() data.LoadedPoint { + ok := d.scanner.Scan() + if !ok && d.scanner.Err() == nil { // nothing scanned & no error = EOF + return data.LoadedPoint{} + } else if !ok { + fatal("scan error: %v", d.scanner.Err()) + return data.LoadedPoint{} + } + return data.NewLoadedPoint(d.scanner.Text()) +} + +func (d *fileDataSource) Headers() *common.GeneratedDataHeaders { return nil } diff --git a/cmd/tsbs_load_redistimeseries/main.go b/cmd/tsbs_load_redistimeseries/main.go new file mode 100644 index 000000000..fd8ed4208 --- /dev/null +++ b/cmd/tsbs_load_redistimeseries/main.go @@ -0,0 +1,102 @@ +package main + +import ( + "crypto/md5" + "fmt" + "github.com/blagojts/viper" + "github.com/mediocregopher/radix/v3" + "github.com/pkg/errors" + "github.com/spf13/pflag" + "github.com/timescale/tsbs/internal/utils" + "github.com/timescale/tsbs/pkg/targets/constants" + "github.com/timescale/tsbs/pkg/targets/initializers" + "log" + + "github.com/timescale/tsbs/load" + "github.com/timescale/tsbs/pkg/targets" +) + +// Program option vars: +var ( + host string + connections uint64 + pipeline uint64 + checkChunks uint64 + singleQueue bool + dataModel string + compressionType string + clusterMode bool +) + +// Global vars +var ( + loader load.BenchmarkRunner + config load.BenchmarkRunnerConfig + target targets.ImplementedTarget + cluster *radix.Cluster + standalone *radix.Pool + addresses []string + slots [][][2]uint16 + conns []radix.Client +) + +// allows for testing +var fatal = log.Fatal +var md5h = md5.New() +var errorTsCreate = errors.New("ERR TSDB: key already exists") + +// Parse args: +func init() { + target = initializers.GetTarget(constants.FormatRedisTimeSeries) + config = load.BenchmarkRunnerConfig{} + config.AddToFlagSet(pflag.CommandLine) + target.TargetSpecificFlags("", pflag.CommandLine) + pflag.Parse() + + err := utils.SetupConfigFile() + + if err != nil { + panic(fmt.Errorf("fatal error config file: %s", err)) + } + + if err := viper.Unmarshal(&config); err != nil { + panic(fmt.Errorf("unable to decode config: %s", err)) + } + host = viper.GetString("host") + connections = viper.GetUint64("connections") + pipeline = viper.GetUint64("pipeline") + dataModel = "redistimeseries" + compressionType = viper.GetString("compression") + clusterMode = viper.GetBool("cluster") + config.NoFlowControl = true + config.HashWorkers = true + loader = load.GetBenchmarkRunner(config) + + opts := make([]radix.DialOpt, 0) + if clusterMode { + cluster = getOSSClusterConn(host, opts, connections) + cluster.Sync() + topology := cluster.Topo().Primaries().Map() + addresses = make([]string, 0) + slots = make([][][2]uint16, 0) + conns = make([]radix.Client, 0) + for nodeAddress, node := range topology { + addresses = append(addresses, nodeAddress) + slots = append(slots, node.Slots) + conn, _ := cluster.Client(nodeAddress) + conns = append(conns, conn) + } + } else { + standalone = getStandaloneConn(host, opts, connections) + } +} + +func main() { + log.Println("Starting benchmark") + + b := benchmark{dbc: &dbCreator{}} + log.Println("Using compression: ", compressionType) + + loader.RunBenchmark(&b) + log.Println("finished benchmark") +} diff --git a/cmd/tsbs_load_redistimeseries/processor.go b/cmd/tsbs_load_redistimeseries/processor.go new file mode 100644 index 000000000..e259dda5e --- /dev/null +++ b/cmd/tsbs_load_redistimeseries/processor.go @@ -0,0 +1,62 @@ +package main + +import ( + "github.com/timescale/tsbs/pkg/targets" + "strconv" + "strings" + "sync" +) + +type processor struct { + dbc *dbCreator + rows []chan string + metrics chan uint64 + wg *sync.WaitGroup +} + +func (p *processor) Init(_ int, _ bool, _ bool) {} + +// ProcessBatch reads eventsBatches which contain rows of data for TS.ADD redis command string +func (p *processor) ProcessBatch(b targets.Batch, doLoad bool) (uint64, uint64) { + events := b.(*eventsBatch) + rowCnt := uint64(len(events.rows)) + metricCnt := uint64(0) + + if doLoad { + buflen := rowCnt + 1 + p.rows = make([]chan string, connections) + p.metrics = make(chan uint64, buflen) + p.wg = &sync.WaitGroup{} + + for i := uint64(0); i < connections; i++ { + p.rows[i] = make(chan string, buflen) + p.wg.Add(1) + if clusterMode { + go connectionProcessorCluster(p.wg, compressionType, p.rows[i], p.metrics, cluster, len(addresses), addresses, slots, conns) + } else { + go connectionProcessor(p.wg, compressionType, p.rows[i], p.metrics, standalone) + } + } + for _, row := range events.rows { + slotS := strings.Split(row, " ")[0] + clusterSlot, _ := strconv.ParseInt(slotS, 10, 0) + i := uint64(clusterSlot) % connections + p.rows[i] <- row + } + + for i := uint64(0); i < connections; i++ { + close(p.rows[i]) + } + p.wg.Wait() + close(p.metrics) + for val := range p.metrics { + metricCnt += val + } + } + events.rows = events.rows[:0] + ePool.Put(events) + return metricCnt, rowCnt +} + +func (p *processor) Close(_ bool) { +} diff --git a/cmd/tsbs_load_redistimeseries/scan.go b/cmd/tsbs_load_redistimeseries/scan.go new file mode 100644 index 000000000..55c0367b3 --- /dev/null +++ b/cmd/tsbs_load_redistimeseries/scan.go @@ -0,0 +1,52 @@ +package main + +import ( + "github.com/mediocregopher/radix/v3" + "strconv" + + "strings" + "sync" + + "github.com/timescale/tsbs/pkg/data" + "github.com/timescale/tsbs/pkg/targets" +) + +func buildCommand(line string, compression_type string) (clusterSlot int, cmdA radix.CmdAction, tscreate bool, metricCount int) { + t := strings.Split(line, " ") + metricCount = 1 + tscreate = false + v, _ := strconv.ParseInt(t[0], 10, 0) + clusterSlot = int(v) + cmdname := t[1] + if cmdname == "TS.CREATE" { + tscreate = true + metricCount = 0 + t = append([]string{t[0], t[1], t[2], compression_type}, t[3:]...) + } + if cmdname == "TS.MADD" { + metricCount = (len(t) - 2) / 3 + } + cmdA = radix.Cmd(nil, cmdname, t[2:]...) + return +} + +type eventsBatch struct { + rows []string +} + +func (eb *eventsBatch) Len() uint { + return uint(len(eb.rows)) +} + +func (eb *eventsBatch) Append(item data.LoadedPoint) { + that := item.Data.(string) + eb.rows = append(eb.rows, that) +} + +var ePool = &sync.Pool{New: func() interface{} { return &eventsBatch{rows: []string{}} }} + +type factory struct{} + +func (f *factory) New() targets.Batch { + return ePool.Get().(*eventsBatch) +} diff --git a/cmd/tsbs_load_redistimeseries/standalone_conn.go b/cmd/tsbs_load_redistimeseries/standalone_conn.go new file mode 100644 index 000000000..bafc03e81 --- /dev/null +++ b/cmd/tsbs_load_redistimeseries/standalone_conn.go @@ -0,0 +1,60 @@ +package main + +import ( + "github.com/mediocregopher/radix/v3" + "log" + "sync" +) + +func getStandaloneConn(addr string, opts []radix.DialOpt, clients uint64) *radix.Pool { + var pool *radix.Pool + var err error + + customConnFunc := func(network, addr string) (radix.Conn, error) { + return radix.Dial(network, addr, opts..., + ) + } + network := "tcp" + pool, err = radix.NewPool(network, addr, int(clients), radix.PoolConnFunc(customConnFunc), radix.PoolPipelineWindow(0, 0)) + if err != nil { + log.Fatalf("Error preparing for benchmark, while creating new connection. error = %v", err) + } + return pool +} + +func connectionProcessor(wg *sync.WaitGroup, compressionType string, rows chan string, metrics chan uint64, conn radix.Client) { + cmds := make([][]radix.CmdAction, 1, 1) + cmds[0] = make([]radix.CmdAction, 0, 0) + curPipe := make([]uint64, 1, 1) + curPipe[0] = 0 + currMetricCount := 0 + comdPos := 0 + + for row := range rows { + _, cmd, _, metricCount := buildCommand(row, compressionType) + currMetricCount += metricCount + cmds[comdPos] = append(cmds[comdPos], cmd) + curPipe[comdPos]++ + + if curPipe[comdPos] == pipeline { + err := conn.Do(radix.Pipeline(cmds[comdPos]...)) + if err != nil { + log.Fatalf("Flush failed with %v", err) + } + metrics <- uint64(currMetricCount) + currMetricCount = 0 + cmds[comdPos] = make([]radix.CmdAction, 0, 0) + curPipe[comdPos] = 0 + } + } + for comdPos, u := range curPipe { + if u > 0 { + err := conn.Do(radix.Pipeline(cmds[comdPos]...)) + if err != nil { + log.Fatalf("Flush failed with %v", err) + } + metrics <- uint64(currMetricCount) + } + } + wg.Done() +} diff --git a/cmd/tsbs_run_queries_cassandra/query_executor.go b/cmd/tsbs_run_queries_cassandra/query_executor.go index 392fd67d2..28dcda5d3 100644 --- a/cmd/tsbs_run_queries_cassandra/query_executor.go +++ b/cmd/tsbs_run_queries_cassandra/query_executor.go @@ -90,7 +90,7 @@ func (qe *HLQueryExecutor) Do(q *HLQuery, opts HLQueryExecutorDoOptions) (qpLagM // optionally, print reponses for query validation: if opts.PrettyPrintResponses { for _, r := range results { - fmt.Fprintf(os.Stderr, "ID %d: [%s, %s] -> %v\n", q.GetID(), r.TimeInterval.Start(), r.TimeInterval.End(), r.Values) + fmt.Fprintf(os.Stdout, "ID %d: [%s, %s] -> %v\n", q.GetID(), r.TimeInterval.Start(), r.TimeInterval.End(), r.Values) } } return diff --git a/cmd/tsbs_run_queries_influx/http_client.go b/cmd/tsbs_run_queries_influx/http_client.go index 24b7b4827..fbfd1b33b 100644 --- a/cmd/tsbs_run_queries_influx/http_client.go +++ b/cmd/tsbs_run_queries_influx/http_client.go @@ -14,6 +14,7 @@ import ( ) var bytesSlash = []byte("/") // heap optimization +var headerAuthorization = "Authorization" // HTTPClient is a reusable HTTP Client. type HTTPClient struct { @@ -22,6 +23,7 @@ type HTTPClient struct { Host []byte HostString string uri []byte + authToken string } // HTTPClientDoOptions wraps options uses when calling `Do`. @@ -46,12 +48,17 @@ func getHttpClient() *http.Client { } // NewHTTPClient creates a new HTTPClient. -func NewHTTPClient(host string) *HTTPClient { +func NewHTTPClient(host string, authToken string) *HTTPClient { + token := "" + if authToken != "" { + token = fmt.Sprintf("Token %s", authToken) + } return &HTTPClient{ client: getHttpClient(), Host: []byte(host), HostString: host, uri: []byte{}, // heap optimization + authToken: token, } } @@ -74,7 +81,9 @@ func (w *HTTPClient) Do(q *query.HTTP, opts *HTTPClientDoOptions) (lag float64, if err != nil { panic(err) } - + if w.authToken != "" { + req.Header.Add(headerAuthorization, w.authToken) + } // Perform the request while tracking latency: start := time.Now() resp, err := w.client.Do(req) diff --git a/cmd/tsbs_run_queries_influx/main.go b/cmd/tsbs_run_queries_influx/main.go index 48a84d757..8e96cb83f 100644 --- a/cmd/tsbs_run_queries_influx/main.go +++ b/cmd/tsbs_run_queries_influx/main.go @@ -20,6 +20,7 @@ import ( var ( daemonUrls []string chunkSize uint64 + authToken string ) // Global vars: @@ -35,6 +36,7 @@ func init() { pflag.String("urls", "http://localhost:8086", "Daemon URLs, comma-separated. Will be used in a round-robin fashion.") pflag.Uint64("chunk-response-size", 0, "Number of series to chunk results into. 0 means no chunking.") + pflag.String("auth-token", "", "Use the Authorization header with the Token scheme to provide your token to InfluxDB. If empty will not send the Authorization header.") pflag.Parse() @@ -49,8 +51,13 @@ func init() { } csvDaemonUrls = viper.GetString("urls") + authToken = viper.GetString("auth-token") chunkSize = viper.GetUint64("chunk-response-size") - + if authToken != "" { + log.Println("Using Authorization header in benchmark") + } else { + log.Println("Given no Authorization header was provided will not send it in benchmark") + } daemonUrls = strings.Split(csvDaemonUrls, ",") if len(daemonUrls) == 0 { log.Fatal("missing 'urls' flag") @@ -78,7 +85,7 @@ func (p *processor) Init(workerNumber int) { database: runner.DatabaseName(), } url := daemonUrls[workerNumber%len(daemonUrls)] - p.w = NewHTTPClient(url) + p.w = NewHTTPClient(url, authToken) } func (p *processor) ProcessQuery(q query.Query, _ bool) ([]*query.Stat, error) { diff --git a/cmd/tsbs_run_queries_questdb/http_client.go b/cmd/tsbs_run_queries_questdb/http_client.go new file mode 100644 index 000000000..daa19d815 --- /dev/null +++ b/cmd/tsbs_run_queries_questdb/http_client.go @@ -0,0 +1,133 @@ +package main + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "os" + "strings" + "sync" + "time" + + "github.com/timescale/tsbs/pkg/query" +) + +var bytesSlash = []byte("/") // heap optimization + +// HTTPClient is a reusable HTTP Client. +type HTTPClient struct { + //client fasthttp.Client + client *http.Client + Host []byte + HostString string + uri []byte +} + +// HTTPClientDoOptions wraps options uses when calling `Do`. +type HTTPClientDoOptions struct { + Debug int + PrettyPrintResponses bool + chunkSize uint64 + database string +} + +var httpClientOnce = sync.Once{} +var httpClient *http.Client + +func getHttpClient() *http.Client { + httpClientOnce.Do(func() { + tr := &http.Transport{ + MaxIdleConnsPerHost: 1024, + } + httpClient = &http.Client{Transport: tr} + }) + return httpClient +} + +// NewHTTPClient creates a new HTTPClient. +func NewHTTPClient(host string) *HTTPClient { + if strings.HasSuffix(host, "/") { + host = host[:len(host)-1] + } + return &HTTPClient{ + client: getHttpClient(), + Host: []byte(host), + HostString: host, + uri: []byte{}, // heap optimization + } +} + +// Do performs the action specified by the given Query. It uses fasthttp, and +// tries to minimize heap allocations. +func (w *HTTPClient) Do(q *query.HTTP, opts *HTTPClientDoOptions) (lag float64, err error) { + // populate uri from the reusable byte slice: + w.uri = w.uri[:0] + w.uri = append(w.uri, w.Host...) + w.uri = append(w.uri, q.Path...) + + // populate a request with data from the Query: + req, err := http.NewRequest(string(q.Method), string(w.uri), nil) + if err != nil { + panic(err) + } + + // Perform the request while tracking latency: + start := time.Now() + resp, err := w.client.Do(req) + if err != nil { + panic(err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + panic("http request did not return status 200 OK") + } + + var body []byte + body, err = ioutil.ReadAll(resp.Body) + + if err != nil { + panic(err) + } + + lag = float64(time.Since(start).Nanoseconds()) / 1e6 // milliseconds + + if opts != nil { + // Print debug messages, if applicable: + switch opts.Debug { + case 1: + fmt.Fprintf(os.Stderr, "debug: %s in %7.2fms\n", q.HumanLabel, lag) + case 2: + fmt.Fprintf(os.Stderr, "debug: %s in %7.2fms -- %s\n", q.HumanLabel, lag, q.HumanDescription) + case 3: + fmt.Fprintf(os.Stderr, "debug: %s in %7.2fms -- %s\n", q.HumanLabel, lag, q.HumanDescription) + fmt.Fprintf(os.Stderr, "debug: request: %s\n", string(q.String())) + case 4: + fmt.Fprintf(os.Stderr, "debug: %s in %7.2fms -- %s\n", q.HumanLabel, lag, q.HumanDescription) + fmt.Fprintf(os.Stderr, "debug: request: %s\n", string(q.String())) + fmt.Fprintf(os.Stderr, "debug: response: %s\n", string(body)) + default: + } + + // Pretty print JSON responses, if applicable: + if opts.PrettyPrintResponses { + // Assumes the response is JSON! This holds for Influx + // and Elastic. + + prefix := fmt.Sprintf("ID %d: ", q.GetID()) + var v interface{} + var line []byte + full := make(map[string]interface{}) + full["influxql"] = string(q.RawQuery) + json.Unmarshal(body, &v) + full["response"] = v + line, err = json.MarshalIndent(full, prefix, " ") + if err != nil { + return + } + fmt.Println(string(line) + "\n") + } + } + + return lag, err +} diff --git a/cmd/tsbs_run_queries_questdb/main.go b/cmd/tsbs_run_queries_questdb/main.go new file mode 100644 index 000000000..69db180e0 --- /dev/null +++ b/cmd/tsbs_run_queries_questdb/main.go @@ -0,0 +1,143 @@ +// tsbs_run_queries_influx speed tests InfluxDB using requests from stdin. +// +// It reads encoded Query objects from stdin, and makes concurrent requests +// to the provided HTTP endpoint. This program has no knowledge of the +// internals of the endpoint. +package main + +import ( + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "log" + "net/http" + "net/url" + "strings" + + "github.com/blagojts/viper" + "github.com/spf13/pflag" + "github.com/timescale/tsbs/internal/utils" + "github.com/timescale/tsbs/pkg/query" +) + +// Program option vars: +var ( + daemonUrls []string +) + +// Global vars: +var ( + runner *query.BenchmarkRunner +) + +// Parse args: +func init() { + var config query.BenchmarkRunnerConfig + config.AddToFlagSet(pflag.CommandLine) + var csvDaemonUrls string + + pflag.String("urls", "http://localhost:9000/", "Daemon URLs, comma-separated. Will be used in a round-robin fashion.") + + pflag.Parse() + + err := utils.SetupConfigFile() + + if err != nil { + panic(fmt.Errorf("fatal error config file: %s", err)) + } + + if err := viper.Unmarshal(&config); err != nil { + panic(fmt.Errorf("unable to decode config: %s", err)) + } + + csvDaemonUrls = viper.GetString("urls") + + daemonUrls = strings.Split(csvDaemonUrls, ",") + if len(daemonUrls) == 0 { + log.Fatal("missing 'urls' flag") + } + + // Add an index to the hostname column in the cpu table + r, err := execQuery(daemonUrls[0], "show columns from cpu") + if err == nil && r.Count != 0 { + r, err := execQuery(daemonUrls[0], "ALTER TABLE cpu ALTER COLUMN hostname ADD INDEX") + _ = r + // fmt.Println("error:", err) + // fmt.Printf("%+v\n", r) + if err == nil { + fmt.Println("Added index to hostname column of cpu table") + } + } + + runner = query.NewBenchmarkRunner(config) +} + +func main() { + runner.Run(&query.HTTPPool, newProcessor) +} + +type processor struct { + w *HTTPClient + opts *HTTPClientDoOptions +} + +func newProcessor() query.Processor { return &processor{} } + +func (p *processor) Init(workerNumber int) { + p.opts = &HTTPClientDoOptions{ + Debug: runner.DebugLevel(), + PrettyPrintResponses: runner.DoPrintResponses(), + } + url := daemonUrls[workerNumber%len(daemonUrls)] + p.w = NewHTTPClient(url) +} + +func (p *processor) ProcessQuery(q query.Query, _ bool) ([]*query.Stat, error) { + hq := q.(*query.HTTP) + lag, err := p.w.Do(hq, p.opts) + if err != nil { + return nil, err + } + stat := query.GetStat() + stat.Init(q.HumanLabelName(), lag) + return []*query.Stat{stat}, nil +} + +type QueryResponseColumns struct { + Name string + Type string +} + +type QueryResponse struct { + Query string + Columns []QueryResponseColumns + Dataset []interface{} + Count int + Error string +} + +func execQuery(uriRoot string, query string) (QueryResponse, error) { + var qr QueryResponse + if strings.HasSuffix(uriRoot, "/") { + uriRoot = uriRoot[:len(uriRoot)-1] + } + uriRoot = uriRoot + "/exec?query=" + url.QueryEscape(query) + resp, err := http.Get(uriRoot) + if err != nil { + return qr, err + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return qr, err + } + err = json.Unmarshal(body, &qr) + if err != nil { + return qr, err + } + if qr.Error != "" { + return qr, errors.New(qr.Error) + } + return qr, nil +} diff --git a/cmd/tsbs_run_queries_redistimeseries/cluster_conn.go b/cmd/tsbs_run_queries_redistimeseries/cluster_conn.go new file mode 100644 index 000000000..f320038ef --- /dev/null +++ b/cmd/tsbs_run_queries_redistimeseries/cluster_conn.go @@ -0,0 +1,33 @@ +package main + +import ( + "github.com/mediocregopher/radix/v3" + "log" +) + +func getOSSClusterConn(addr string, opts []radix.DialOpt, clients uint64) *radix.Cluster { + var vanillaCluster *radix.Cluster + var err error + + customConnFunc := func(network, addr string) (radix.Conn, error) { + return radix.Dial(network, addr, opts..., + ) + } + + // this cluster will use the ClientFunc to create a pool to each node in the + // cluster. + poolFunc := func(network, addr string) (radix.Client, error) { + return radix.NewPool(network, addr, int(clients), radix.PoolConnFunc(customConnFunc), radix.PoolPipelineWindow(0, 0)) + } + + vanillaCluster, err = radix.NewCluster([]string{addr}, radix.ClusterPoolFunc(poolFunc)) + if err != nil { + log.Fatalf("Error preparing for benchmark, while creating new connection. error = %v", err) + } + // Issue CLUSTER SLOTS command + err = vanillaCluster.Sync() + if err != nil { + log.Fatalf("Error preparing for benchmark, while issuing CLUSTER SLOTS. error = %v", err) + } + return vanillaCluster +} diff --git a/cmd/tsbs_run_queries_redistimeseries/common.go b/cmd/tsbs_run_queries_redistimeseries/common.go new file mode 100644 index 000000000..d9b96e730 --- /dev/null +++ b/cmd/tsbs_run_queries_redistimeseries/common.go @@ -0,0 +1,31 @@ +package main + +import ( + "fmt" + "github.com/mediocregopher/radix/v3" + "github.com/timescale/tsbs/pkg/query" + "log" + "strings" +) + +func inner_cmd_logic(p *processor, tq *query.RedisTimeSeries, idx int, replies [][]interface{}, commandArgs []string) error { + var err error = nil + if p.opts.debug { + fmt.Println(fmt.Sprintf("Issuing command (%s %s)", string(tq.CommandNames[idx]), strings.Join(commandArgs, " "))) + } + if clusterMode { + if string(tq.CommandNames[idx]) == "TS.MRANGE" || string(tq.CommandNames[idx]) == "TS.QUERYINDEX" || string(tq.CommandNames[idx]) == "TS.MGET" || string(tq.CommandNames[idx]) == "TS.MREVRANGE" { + rPos := r.Intn(len(conns)) + conn := conns[rPos] + err = conn.Do(radix.Cmd(&replies[idx], string(tq.CommandNames[idx]), commandArgs...)) + } else { + err = cluster.Do(radix.Cmd(&replies[idx], string(tq.CommandNames[idx]), commandArgs...)) + } + } else { + err = standalone.Do(radix.Cmd(&replies[idx], string(tq.CommandNames[idx]), commandArgs...)) + } + if err != nil { + log.Fatalf("Command (%s %s) failed with error: %v\n", string(tq.CommandNames[idx]), strings.Join(ByteArrayToStringArray(tq.RedisQueries[idx]), " "), err) + } + return err +} diff --git a/cmd/tsbs_run_queries_redistimeseries/debug.go b/cmd/tsbs_run_queries_redistimeseries/debug.go new file mode 100644 index 000000000..648d909aa --- /dev/null +++ b/cmd/tsbs_run_queries_redistimeseries/debug.go @@ -0,0 +1,54 @@ +package main + +import ( + "fmt" + "github.com/timescale/tsbs/pkg/query" +) + +func debug_print_redistimeseries_reply(reply [][]interface{}, idx int, tq *query.RedisTimeSeries) { + fmt.Println(fmt.Sprintf("Command reply. Total series %d", len(reply[idx]))) + for _, serie := range reply[idx] { + converted_serie := serie.([]interface{}) + serie_name := string(converted_serie[0].([]uint8)) + fmt.Println(fmt.Sprintf("\tSerie name: %s", serie_name)) + serie_labels := converted_serie[1].([]interface{}) + fmt.Println(fmt.Sprintf("\tSerie labels:")) + for _, kvpair := range serie_labels { + kvpairc := kvpair.([]interface{}) + k := string(kvpairc[0].([]uint8)) + v := string(kvpairc[1].([]uint8)) + fmt.Println(fmt.Sprintf("\t\t%s: %s", k, v)) + } + fmt.Println(fmt.Sprintf("\tSerie datapoints:")) + serie_datapoints := converted_serie[2].([]interface{}) + if string(tq.CommandNames[idx]) == "TS.MGET" { + ts := serie_datapoints[0].(int64) + v := serie_datapoints[1].(string) + fmt.Println(fmt.Sprintf("\t\tts: %d value: %s", ts, v)) + + } else { + for _, datapointpair := range serie_datapoints { + datapoint := datapointpair.([]interface{}) + ts := datapoint[0].(int64) + v := datapoint[1].(string) + fmt.Println(fmt.Sprintf("\t\tts: %d value: %s", ts, v)) + } + } + } +} + +func ByteArrayToInterfaceArray(qry [][]byte) []interface{} { + commandArgs := make([]interface{}, len(qry)) + for i := 0; i < len(qry); i++ { + commandArgs[i] = qry[i] + } + return commandArgs +} + +func ByteArrayToStringArray(qry [][]byte) []string { + commandArgs := make([]string, len(qry)) + for i := 0; i < len(qry); i++ { + commandArgs[i] = string(qry[i]) + } + return commandArgs +} diff --git a/cmd/tsbs_run_queries_redistimeseries/functors.go b/cmd/tsbs_run_queries_redistimeseries/functors.go new file mode 100644 index 000000000..52376653c --- /dev/null +++ b/cmd/tsbs_run_queries_redistimeseries/functors.go @@ -0,0 +1,42 @@ +package main + +import ( + "fmt" + "github.com/timescale/tsbs/pkg/query" + "strings" +) + +// specifically for Resultsets: high-cpu-1, high-cpu-all +// we need to take the reply timestamps and re-issue the query now with +// FILTER_BY_TS given the timestamps that passed the first query condition ( the FILTER_BY_VALUE ) +func highCpuFilterByTsFunctor(tq *query.RedisTimeSeries, replies [][]interface{}, idx int, commandArgs []string, p *processor, err error) error { + if len(replies[idx]) > 0 { + new_query := []string{commandArgs[0], commandArgs[1], "FILTER_BY_TS"} + first_serie := replies[idx][0] + serie_datapoints := first_serie.([]interface{})[2].([]interface{}) + if len(serie_datapoints) == 0 { + if p.opts.debug { + fmt.Println(fmt.Sprintf("Applying FILTER_BY_VALUE condition returned zero series")) + } + return err + } + for _, datapointpair := range serie_datapoints { + datapoint := datapointpair.([]interface{}) + ts := datapoint[0].(int64) + new_query = append(new_query, fmt.Sprintf("%d", ts)) + } + new_query = append(new_query, "FILTER") + for _, arg := range commandArgs[7 : len(commandArgs)-4] { + new_query = append(new_query, arg) + } + if p.opts.debug { + fmt.Println(fmt.Sprintf("Applying FILTER_BY_TS condition command (%s %s)", string(tq.CommandNames[idx]), strings.Join(new_query, " "))) + } + err = inner_cmd_logic(p, tq, idx, replies, new_query) + } else { + if p.opts.debug { + fmt.Println(fmt.Sprintf("Applying FILTER_BY_VALUE condition returned zero series")) + } + } + return err +} diff --git a/cmd/tsbs_run_queries_redistimeseries/main.go b/cmd/tsbs_run_queries_redistimeseries/main.go new file mode 100644 index 000000000..c909727b4 --- /dev/null +++ b/cmd/tsbs_run_queries_redistimeseries/main.go @@ -0,0 +1,146 @@ +// tsbs_run_queries_redistimeseries speed tests RedisTimeSeries using requests from stdin or file +// + +// This program has no knowledge of the internals of the endpoint. +package main + +import ( + "fmt" + "github.com/mediocregopher/radix/v3" + "math/rand" + "time" + + "github.com/blagojts/viper" + _ "github.com/lib/pq" + "github.com/spf13/pflag" + "github.com/timescale/tsbs/internal/utils" + "github.com/timescale/tsbs/pkg/query" +) + +// Program option vars: +var ( + host string + showExplain bool + clusterMode bool + cluster *radix.Cluster + standalone *radix.Pool + addresses []string + slots [][][2]uint16 + conns []radix.Client + r *rand.Rand +) + +// Global vars: +var ( + runner *query.BenchmarkRunner + cmdMrange = []byte("TS.MRANGE") + cmdMRevRange = []byte("TS.MREVRANGE") + cmdQueryIndex = []byte("TS.QUERYINDEX") + reflect_SingleGroupByTime = query.GetFunctionName(query.SingleGroupByTime) + reflect_GroupByTimeAndMax = query.GetFunctionName(query.GroupByTimeAndMax) + reflect_GroupByTimeAndTagMax = query.GetFunctionName(query.GroupByTimeAndTagMax) + reflect_GroupByTimeAndTagHostname = query.GetFunctionName(query.GroupByTimeAndTagHostname) + reflect_HighCpu = query.GetFunctionName(query.HighCpu) +) + +// Parse args: +func init() { + var config query.BenchmarkRunnerConfig + config.AddToFlagSet(pflag.CommandLine) + + pflag.StringVar(&host, "host", "localhost:6379", "Redis host address and port") + pflag.BoolVar(&clusterMode, "cluster", false, "Whether to use OSS cluster API") + pflag.Parse() + + err := utils.SetupConfigFile() + + if err != nil { + panic(fmt.Errorf("fatal error config file: %s", err)) + } + + if err := viper.Unmarshal(&config); err != nil { + panic(fmt.Errorf("unable to decode config: %s", err)) + } + + s := rand.NewSource(time.Now().Unix()) + r = rand.New(s) // initialize local pseudorandom generator + + opts := make([]radix.DialOpt, 0) + opts = append(opts, radix.DialReadTimeout(120*time.Second)) + if clusterMode { + cluster = getOSSClusterConn(host, opts, uint64(config.Workers)) + cluster.Sync() + topology := cluster.Topo().Primaries().Map() + addresses = make([]string, 0) + slots = make([][][2]uint16, 0) + conns = make([]radix.Client, 0) + for nodeAddress, node := range topology { + addresses = append(addresses, nodeAddress) + slots = append(slots, node.Slots) + conn, _ := cluster.Client(nodeAddress) + conns = append(conns, conn) + } + } else { + standalone = getStandaloneConn(host, opts, uint64(config.Workers)) + } + runner = query.NewBenchmarkRunner(config) +} +func main() { + runner.Run(&query.RedisTimeSeriesPool, newProcessor) +} + +type queryExecutorOptions struct { + showExplain bool + debug bool + printResponse bool +} + +type processor struct { + opts *queryExecutorOptions +} + +func newProcessor() query.Processor { return &processor{} } + +func (p *processor) Init(numWorker int) { + p.opts = &queryExecutorOptions{ + showExplain: showExplain, + debug: runner.DebugLevel() > 0, + printResponse: runner.DoPrintResponses(), + } +} + +func (p *processor) ProcessQuery(q query.Query, isWarm bool) (queryStats []*query.Stat, err error) { + + // No need to run again for EXPLAIN + if isWarm && p.opts.showExplain { + return nil, nil + } + tq := q.(*query.RedisTimeSeries) + + var cmds = make([][]string, 0, 0) + var replies = make([][]interface{}, 0, 0) + for _, qry := range tq.RedisQueries { + cmds = append(cmds, ByteArrayToStringArray(qry)) + replies = append(replies, []interface{}{}) + } + + start := time.Now() + for idx, commandArgs := range cmds { + err := inner_cmd_logic(p, tq, idx, replies, commandArgs) + if tq.Functor == "FILTER_BY_TS" { + err = highCpuFilterByTsFunctor(tq, replies, idx, commandArgs, p, err) + } + if err != nil { + return nil, err + } + if p.opts.debug { + debug_print_redistimeseries_reply(replies, idx, tq) + } + } + took := float64(time.Since(start).Nanoseconds()) / 1e6 + + stat := query.GetStat() + stat.Init(q.HumanLabelName(), took) + queryStats = []*query.Stat{stat} + return queryStats, err +} diff --git a/cmd/tsbs_run_queries_redistimeseries/standalone_conn.go b/cmd/tsbs_run_queries_redistimeseries/standalone_conn.go new file mode 100644 index 000000000..01f34959d --- /dev/null +++ b/cmd/tsbs_run_queries_redistimeseries/standalone_conn.go @@ -0,0 +1,22 @@ +package main + +import ( + "github.com/mediocregopher/radix/v3" + "log" +) + +func getStandaloneConn(addr string, opts []radix.DialOpt, clients uint64) *radix.Pool { + var pool *radix.Pool + var err error + + customConnFunc := func(network, addr string) (radix.Conn, error) { + return radix.Dial(network, addr, opts..., + ) + } + network := "tcp" + pool, err = radix.NewPool(network, addr, int(clients), radix.PoolConnFunc(customConnFunc), radix.PoolPipelineWindow(0, 0)) + if err != nil { + log.Fatalf("Error preparing for benchmark, while creating new connection. error = %v", err) + } + return pool +} diff --git a/docker/Dockerfile b/docker/Dockerfile new file mode 100644 index 000000000..647d8fe9f --- /dev/null +++ b/docker/Dockerfile @@ -0,0 +1,17 @@ +FROM golang:1.13.5-alpine AS builder + +# Copy the code from the host and compile it +WORKDIR $GOPATH/src/github.com/timescale/tsbs +COPY . ./ +RUN apk add --no-cache git make bash +RUN make all + +FROM golang:1.13.5-alpine +# install bash shell +RUN apk add --update bash && rm -rf /var/cache/apk/* + +ENV PATH ./:$PATH +COPY --from=builder $GOPATH/src/github.com/timescale/tsbs/bin/tsbs_* ./ +COPY ./docker/docker_entrypoint.sh ./ +RUN chmod 751 docker_entrypoint.sh +ENTRYPOINT ["./docker_entrypoint.sh"] \ No newline at end of file diff --git a/docker/docker_entrypoint.sh b/docker/docker_entrypoint.sh new file mode 100644 index 000000000..27bddcdf1 --- /dev/null +++ b/docker/docker_entrypoint.sh @@ -0,0 +1,10 @@ +#!/usr/bin/env bash +# Ensure runner is available +EXE_FILE_NAME=$(which $1) +if [[ -z "$EXE_FILE_NAME" ]]; then + echo "$1 not available. It is not specified explicitly and not found in \$PATH" + exit 1 +else + "$@" + exit 0 +fi diff --git a/docs/questdb.md b/docs/questdb.md new file mode 100644 index 000000000..affdb0e3f --- /dev/null +++ b/docs/questdb.md @@ -0,0 +1,162 @@ +# TSBS Supplemental Guide: QuestDB + +QuestDB is a high-performance open-source time series database with SQL as a +query language with time-oriented extensions. QuestDB implements PostgreSQL wire +protocol, a REST API, and supports ingestion using InfluxDB line protocol. + +This guide explains how the data for TSBS is generated along with additional +flags available when using the data importer (`tsbs_load_questdb`). +**This should be read _after_ the main README.** + +## Data format + +Data generated by `tsbs_generate_data` is in InfluxDB line protocol format where each +reading is composed of the following: + +- the table name followed by a comma +- several comma-separated items of tags in the format `