|
1 |
| -const { getDuration, preJoinLabels, dist } = require('../common') |
| 1 | +const { getDuration, preJoinLabels, dist, sharedParamNames } = require('../common') |
2 | 2 | const reg = require('./log_range_agg_reg_v3_2')
|
3 | 3 | const Sql = require('@cloki/clickhouse-sql')
|
4 | 4 | const { DATABASE_NAME, checkVersion } = require('../../../lib/utils')
|
@@ -44,24 +44,30 @@ function isLogPipeline (token) {
|
44 | 44 | */
|
45 | 45 | module.exports.apply = (token, fromNS, toNS, stepNS) => {
|
46 | 46 | fromNS = Math.floor(fromNS / 15000000000) * 15000000000
|
| 47 | + const fromParam = new Sql.Parameter(sharedParamNames.from) |
| 48 | + const toParam = new Sql.Parameter(sharedParamNames.to) |
47 | 49 | const tsClause = toNS
|
48 | 50 | ? Sql.between('samples.timestamp_ns', fromNS, toNS)
|
49 | 51 | : Sql.Gt('samples.timestamp_ns', fromNS)
|
50 | 52 | let q = (new Sql.Select())
|
51 | 53 | .select(['samples.fingerprint', 'fingerprint'])
|
52 | 54 | .from([`${DATABASE_NAME()}.metrics_15s${_dist}`, 'samples'])
|
53 | 55 | .where(tsClause)
|
| 56 | + .addParam(fromParam) |
| 57 | + .addParam(toParam) |
| 58 | + fromParam.set(fromNS) |
| 59 | + toParam.set(toNS) |
54 | 60 |
|
55 | 61 | q.ctx = {
|
56 | 62 | step: stepNS / 1000000000,
|
57 | 63 | inline: !!clusterName
|
58 | 64 | }
|
59 | 65 |
|
60 |
| - preJoinLabels(token, q, dist) |
61 |
| - |
62 | 66 | for (const streamSelectorRule of token.Children('log_stream_selector_rule')) {
|
63 | 67 | q = streamSelectorReg[streamSelectorRule.Child('operator').value](streamSelectorRule, q)
|
64 | 68 | }
|
| 69 | + preJoinLabels(token, q, dist) |
| 70 | + q = q.groupBy('labels') |
65 | 71 |
|
66 | 72 | const lra = token.Child('log_range_aggregation')
|
67 | 73 | q = reg[lra.Child('log_range_aggregation_fn').value](lra, q)
|
|
0 commit comments