Skip to content

Commit 8fa69d2

Browse files
committed
promql optimization: rate & sum
1 parent b592684 commit 8fa69d2

9 files changed

+90
-96
lines changed

promql/index.js

+1
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ module.exports.getData = async (matchers, fromMs, toMs, subqueries) => {
121121
const db = DATABASE_NAME()
122122
const subq = (subqueries || {})[getMetricName(matchers)]
123123
if (subq) {
124+
console.log(subq)
124125
const data = await rawRequest(subq + ' FORMAT RowBinary',
125126
null, db, { responseType: 'arraybuffer' })
126127
return new Uint8Array(data.data)

wasm_parts/main.go

+32-77
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,9 @@ func pql(id uint32, c *ctx, optimizable bool,
274274
}
275275
}
276276

277+
maxDurationMs := getMaxDurationMs(rq.Statement())
278+
fromMs -= maxDurationMs
279+
277280
subsels := strings.Builder{}
278281
subsels.WriteString("{")
279282
if optimizable {
@@ -291,15 +294,18 @@ func pql(id uint32, c *ctx, optimizable bool,
291294
if i != 0 {
292295
subsels.WriteString(",")
293296
}
294-
subsels.WriteString(fmt.Sprintf(`"%s":"%s"`, strconv.Quote(k), strconv.Quote(v)))
297+
subsels.WriteString(fmt.Sprintf(`%s:%s`, strconv.Quote(k), strconv.Quote(v)))
295298
i++
296299
}
297300
}
298301
subsels.WriteString("}")
299302

300303
matchersJSON := getmatchersJSON(rq)
301304

302-
c.response = []byte(fmt.Sprintf(`{"subqueries": %s, "matchers": %s}`, subsels.String(), matchersJSON))
305+
c.response = []byte(fmt.Sprintf(`{"subqueries": %s, "matchers": %s, "fromMs": %d}`,
306+
subsels.String(),
307+
matchersJSON,
308+
fromMs))
303309
c.onDataLoad = func(c *ctx) {
304310
ctxId := gcContext.GetContextID()
305311
gcContext.SetContext(id)
@@ -312,6 +318,21 @@ func pql(id uint32, c *ctx, optimizable bool,
312318
return 0
313319
}
314320

321+
func getMaxDurationMs(q parser.Node) int64 {
322+
maxDurationMs := int64(0)
323+
for _, c := range parser.Children(q) {
324+
_m := getMaxDurationMs(c)
325+
if _m > maxDurationMs {
326+
maxDurationMs = _m
327+
}
328+
}
329+
ms, _ := q.(*parser.MatrixSelector)
330+
if ms != nil && maxDurationMs < ms.Range.Milliseconds() {
331+
return ms.Range.Milliseconds()
332+
}
333+
return maxDurationMs
334+
}
335+
315336
func optimizeQuery(q promql.Query, fromMs int64, toMs int64, stepMs int64) (map[string]string, promql.Query, error) {
316337
appliableNodes := findAppliableNodes(q.Statement(), nil)
317338
var err error
@@ -352,7 +373,7 @@ func optimizeQuery(q promql.Query, fromMs int64, toMs int64, stepMs int64) (map[
352373
IsCluster: false,
353374
From: time.Unix(0, fromMs*1000000),
354375
To: time.Unix(0, toMs*1000000),
355-
Step: time.Millisecond * time.Duration(stepMs),
376+
Step: time.Millisecond * 15000, /*time.Duration(stepMs)*/
356377
TimeSeriesTable: "time_series",
357378
TimeSeriesDistTable: "time_series_dist",
358379
TimeSeriesGinTable: "time_series_gin",
@@ -448,76 +469,6 @@ func writeVector(v promql.Vector) string {
448469
}
449470

450471
func main() {
451-
queriable := &TestQueryable{id: 0, stepMs: 15000}
452-
rq, err := getEng().NewRangeQuery(
453-
queriable,
454-
nil,
455-
"histogram_quantile(0.5, sum by (container) (rate(network_usage{container=~\"awesome\"}[5m])))",
456-
time.Now().Add(time.Hour*-24),
457-
time.Now(),
458-
time.Millisecond*time.Duration(15000))
459-
if err != nil {
460-
panic(err)
461-
}
462-
matchers := findAppliableNodes(rq.Statement(), nil)
463-
for _, m := range matchers {
464-
fmt.Println(m)
465-
opt := m.optimizer
466-
opt = &promql2.FinalizerOptimizer{
467-
SubOptimizer: opt,
468-
}
469-
opt, err = promql2.PlanOptimize(m.node, opt)
470-
if err != nil {
471-
panic(err)
472-
}
473-
planner, err := opt.Optimize(m.node)
474-
if err != nil {
475-
panic(err)
476-
}
477-
fakeMetric := fmt.Sprintf("fake_metric_%d", time.Now().UnixNano())
478-
fmt.Println(rq.Statement())
479-
swapChild(m.parent, m.node, &parser.VectorSelector{
480-
Name: fakeMetric,
481-
OriginalOffset: 0,
482-
Offset: 0,
483-
Timestamp: nil,
484-
StartOrEnd: 0,
485-
LabelMatchers: []*labels.Matcher{
486-
{
487-
Type: labels.MatchEqual,
488-
Name: "__name__",
489-
Value: fakeMetric,
490-
},
491-
},
492-
UnexpandedSeriesSet: nil,
493-
Series: nil,
494-
PosRange: parser.PositionRange{},
495-
})
496-
fmt.Println(rq.Statement())
497-
sel, err := planner.Process(&shared2.PlannerContext{
498-
IsCluster: false,
499-
From: time.Now().Add(time.Hour * -24),
500-
To: time.Now(),
501-
Step: time.Millisecond * time.Duration(15000),
502-
TimeSeriesTable: "time_series",
503-
TimeSeriesDistTable: "time_series_dist",
504-
TimeSeriesGinTable: "time_series_gin",
505-
MetricsTable: "metrics_15s",
506-
MetricsDistTable: "metrics_15s_dist",
507-
})
508-
if err != nil {
509-
panic(err)
510-
}
511-
strSel, err := sel.String(&sql.Ctx{
512-
Params: map[string]sql.SQLObject{},
513-
Result: map[string]sql.SQLObject{},
514-
})
515-
if err != nil {
516-
panic(err)
517-
}
518-
println(strSel)
519-
}
520-
521472
}
522473

523474
func getOptimizer(n parser.Node) promql2.IOptimizer {
@@ -696,10 +647,11 @@ type TestSeries struct {
696647
data []byte
697648
stepMs int64
698649

699-
labels labels.Labels
700-
tsMs int64
701-
val float64
702-
i int
650+
labels labels.Labels
651+
tsMs int64
652+
val float64
653+
lastValTs int64
654+
i int
703655

704656
state int
705657
}
@@ -721,11 +673,14 @@ func (t *TestSeries) Next() bool {
721673
t.tsMs += t.stepMs
722674
if t.tsMs >= ts {
723675
t.state = 0
676+
} else if t.lastValTs+300000 < t.tsMs {
677+
t.state = 0
724678
}
725679
}
726680
if t.state == 0 {
727681
t.tsMs = ts
728682
t.val = *(*float64)(unsafe.Pointer(&t.data[t.i*16+8]))
683+
t.lastValTs = t.tsMs
729684
t.i++
730685
t.state = 1
731686
}

wasm_parts/main.js

+5-4
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ module.exports.pqlRangeQuery = async (query, startMs, endMs, stepMs, getData) =>
6262
const step = stepMs || 15000
6363
return await pql(query,
6464
(ctx) => _wasm.exportsWrap.pqlRangeQuery(ctx.id, start, end, step, process.env.EXPERIMENTAL_PROMQL_OPTIMIZE ? 1 : 0),
65-
(matchers, subq) => getData(matchers, start, end, subq))
65+
(matchers, subq, startMs) => getData(matchers, startMs, end, subq))
6666
}
6767

6868
/**
@@ -75,9 +75,10 @@ module.exports.pqlRangeQuery = async (query, startMs, endMs, stepMs, getData) =>
7575
module.exports.pqlInstantQuery = async (query, timeMs, getData) => {
7676
const time = timeMs || Date.now()
7777
const _wasm = getWasm()
78+
const start = time - 300000
7879
return await pql(query,
7980
(ctx) => _wasm.exportsWrap.pqlInstantQuery(ctx.id, time, process.env.EXPERIMENTAL_PROMQL_OPTIMIZE ? 1 : 0),
80-
(matchers, subq) => getData(matchers, time - 300000, time, subq))
81+
(matchers, subq, start) => getData(matchers, start, time, subq))
8182
}
8283

8384
module.exports.pqlMatchers = (query) => {
@@ -162,8 +163,8 @@ const pql = async (query, wasmCall, getData) => {
162163
const matchersObj = JSON.parse(ctx.read())
163164

164165
const matchersResults = await Promise.all(
165-
matchersObj.map(async (matchers, i) => {
166-
const data = await getData(matchers.matchers, matchers.subqueries)
166+
matchersObj.matchers.map(async (matchers, i) => {
167+
const data = await getData(matchers, matchersObj.subqueries, matchersObj.fromMs)
167168
return { matchers, data }
168169
}))
169170

wasm_parts/main.wasm.gz

38.4 KB
Binary file not shown.

wasm_parts/promql/planners/finalize.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -32,16 +32,16 @@ func (f *FinalizePlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, erro
3232
withMain := sql.NewWith(main, "pre_final")
3333
res := sql.NewSelect().With(withMain).Select(withMain).
3434
Select(
35-
sql.NewSimpleCol("pre_final.fingerprint", "fingerprint"),
3635
sql.NewSimpleCol(withLabels.GetAlias()+".labels", "labels"),
3736
sql.NewSimpleCol("arraySort(groupArray((pre_final.timestamp_ms, pre_final.value)))", "values"),
3837
).From(sql.NewWithRef(withMain)).
38+
//AndWhere(sql.Neq(sql.NewRawObject("pre_final.value"), sql.NewIntVal(0))).
3939
Join(sql.NewJoin(
4040
"ANY LEFT",
4141
sql.NewWithRef(withLabels),
4242
sql.Eq(
4343
sql.NewRawObject("pre_final.fingerprint"),
4444
sql.NewRawObject(withLabels.GetAlias()+".new_fingerprint")))).
45-
GroupBy(sql.NewRawObject("pre_final.fingerprint"), sql.NewRawObject(withLabels.GetAlias()+".labels"))
45+
GroupBy(sql.NewRawObject(withLabels.GetAlias() + ".labels"))
4646
return res, nil
4747
}

wasm_parts/promql/planners/metrics_extend.go

+13-3
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,25 @@ func (m *MetricsExtendPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect,
2222
withMain := sql.NewWith(main, "pre_extend")
2323
extendedCol := sql.NewCustomCol(func(ctx *sql.Ctx, options ...int) (string, error) {
2424
return fmt.Sprintf(
25-
"argMaxIf(value, timestamp_ms, isNaN(value) = 0) OVER ("+
25+
"argMaxIf(value, timestamp_ms, pre_extend.original = 1) OVER ("+
26+
"PARTITION BY fingerprint ORDER BY timestamp_ms ROWS BETWEEN %d PRECEDING AND CURRENT ROW"+
27+
")", extendCnt), nil
28+
})
29+
origCol := sql.NewCustomCol(func(ctx *sql.Ctx, options ...int) (string, error) {
30+
return fmt.Sprintf(
31+
"max(original) OVER ("+
2632
"PARTITION BY fingerprint ORDER BY timestamp_ms ROWS BETWEEN %d PRECEDING AND CURRENT ROW"+
2733
")", extendCnt), nil
2834
})
2935
extend := sql.NewSelect().With(withMain).
3036
Select(
3137
sql.NewSimpleCol("fingerprint", "fingerprint"),
3238
sql.NewSimpleCol("timestamp_ms", "timestamp_ms"),
33-
sql.NewCol(extendedCol, "value")).
39+
sql.NewCol(extendedCol, "value"),
40+
sql.NewCol(origCol, "original")).
3441
From(sql.NewWithRef(withMain))
35-
return extend, nil
42+
withExtend := sql.NewWith(extend, "extend")
43+
return sql.NewSelect().With(withExtend).Select(sql.NewRawObject("*")).
44+
From(sql.NewWithRef(withExtend)).
45+
AndWhere(sql.Eq(sql.NewRawObject("original"), sql.NewIntVal(1))), nil
3646
}

wasm_parts/promql/planners/metrics_rate.go

+26-5
Original file line numberDiff line numberDiff line change
@@ -34,18 +34,39 @@ func (m *RatePlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, error) {
3434
"PARTITION BY fingerprint ORDER BY timestamp_ms ROWS BETWEEN %d PRECEDING AND CURRENT ROW"+
3535
")", rateCnt), nil
3636
})
37-
valueCol := sql.NewCustomCol(func(ctx *sql.Ctx, options ...int) (string, error) {
37+
resetCol := sql.NewCustomCol(func(ctx *sql.Ctx, options ...int) (string, error) {
3838
return fmt.Sprintf(
39-
"if(last > first, last - first, last) / %f", m.Duration.Seconds()), nil
39+
"if(value < (any(value) OVER (" +
40+
"PARTITION BY fingerprint ORDER BY timestamp_ms ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING" +
41+
") as lastValue), lastValue, 0)"), nil
4042
})
41-
extend := sql.NewSelect().With(withMain).
43+
reset := sql.NewSelect().With(withMain).
44+
Select(
45+
sql.NewSimpleCol("fingerprint", "fingerprint"),
46+
sql.NewSimpleCol("timestamp_ms", "timestamp_ms"),
47+
sql.NewCol(resetCol, "reset"),
48+
sql.NewSimpleCol("value", "value")).
49+
From(sql.NewWithRef(withMain))
50+
withReset := sql.NewWith(reset, "pre_reset")
51+
resetColSum := sql.NewCustomCol(func(ctx *sql.Ctx, options ...int) (string, error) {
52+
_rateCnt := rateCnt - 1
53+
if rateCnt <= 1 {
54+
_rateCnt = 1
55+
}
56+
return fmt.Sprintf(
57+
"sum(reset) OVER ("+
58+
"PARTITION BY fingerprint ORDER BY timestamp_ms ROWS BETWEEN %d PRECEDING AND CURRENT ROW"+
59+
")", _rateCnt), nil
60+
})
61+
extend := sql.NewSelect().With(withReset).
4262
Select(
4363
sql.NewSimpleCol("fingerprint", "fingerprint"),
4464
sql.NewSimpleCol("timestamp_ms", "timestamp_ms"),
4565
sql.NewCol(lastCol, "last"),
4666
sql.NewCol(firstCol, "first"),
47-
sql.NewCol(valueCol, "_value")).
48-
From(sql.NewWithRef(withMain))
67+
sql.NewCol(resetColSum, "reset"),
68+
sql.NewSimpleCol(fmt.Sprintf("(last - first + reset) / %f", m.Duration.Seconds()), "_value")).
69+
From(sql.NewWithRef(withReset))
4970
withExtend := sql.NewWith(extend, "rate")
5071
return sql.NewSelect().
5172
With(withExtend).

wasm_parts/promql/planners/metrics_raw_init.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ func (m *MetricsInitPlanner) Process(ctx *shared.PlannerContext) (sql.ISelect, e
2626
return sql.NewSelect().With(withFpReq).Select(
2727
sql.NewSimpleCol("fingerprint", "fingerprint"),
2828
sql.NewCol(tsNsCol, "timestamp_ms"),
29-
sql.NewCol(m.ValueCol, "value")).
29+
sql.NewCol(m.ValueCol, "value"),
30+
sql.NewSimpleCol("1::UInt8", "original")).
3031
From(sql.NewSimpleCol(ctx.MetricsTable, "metrics")).
3132
AndWhere(
3233
sql.Ge(sql.NewRawObject("timestamp_ns"), sql.NewIntVal(ctx.From.UnixNano())),

wasm_parts/promql/planners/metrics_zerofill.go

+9-4
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,12 @@ func (m *MetricsZeroFillPlanner) Process(ctx *shared.PlannerContext) (sql.ISelec
1515
if err != nil {
1616
return nil, err
1717
}
18-
withMain := sql.NewWith(main, "prezerofill")
18+
main.OrderBy(sql.NewRawObject("fingerprint"), sql.NewCustomCol(func(_ *sql.Ctx, options ...int) (string, error) {
19+
return fmt.Sprintf("timestamp_ms WITH FILL FROM %d TO %d STEP %d",
20+
ctx.From.UnixMilli(), ctx.To.UnixMilli(), ctx.Step.Milliseconds()), nil
21+
}))
22+
return main, nil
23+
/*withMain := sql.NewWith(main, "prezerofill")
1924
arrLen := (ctx.To.UnixNano()-ctx.From.UnixNano())/ctx.Step.Nanoseconds() + 1
2025
zeroFillCol := sql.NewCustomCol(func(_ *sql.Ctx, options ...int) (string, error) {
2126
return fmt.Sprintf("groupArrayInsertAt(nan, %d)(value, toUInt32(intDiv(timestamp_ms - %d, %d)))",
@@ -37,9 +42,9 @@ func (m *MetricsZeroFillPlanner) Process(ctx *shared.PlannerContext) (sql.ISelec
3742
postZeroFill := sql.NewSelect().With(withZeroFill).
3843
Select(
3944
sql.NewSimpleCol("fingerprint", "fingerprint"),
40-
sql.NewSimpleCol("val.1", "timestamp_ms"),
45+
sql.NewSimpleCol("timestamp_ms", "timestamp_ms"),
4146
sql.NewSimpleCol("val.2", "value")).
42-
From(sql.NewWithRef(withZeroFill)).
47+
From(sql.NewWithRef(withMain)).
4348
Join(sql.NewJoin("array", sql.NewCol(joinZeroFillStmt, "val"), nil))
44-
return postZeroFill, nil
49+
return postZeroFill, nil*/
4550
}

0 commit comments

Comments
 (0)