Skip to content

Commit 13c13f9

Browse files
akvladlmangani
andauthored
custom formatters (metrico#217)
* protobuf ingestion test * debug * fix test node14 * remove logs Co-authored-by: Lorenzo Mangani <[email protected]>
1 parent 3984611 commit 13c13f9

File tree

7 files changed

+151
-12
lines changed

7 files changed

+151
-12
lines changed

lib/handlers/common.js

+59
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
const eng = require('../../plugins/engine')
2+
const { parseCliQL } = require('../cliql')
3+
const { Transform } = require('stream')
4+
const { scanClickhouse, scanFingerprints } = require('../db/clickhouse')
5+
6+
module.exports.checkCustomPlugins = async (options) => {
7+
options.API = options.API || {
8+
logql: async (query, start, end, limit) => {
9+
const params = {
10+
query,
11+
start,
12+
end,
13+
limit,
14+
direction: 'backward',
15+
step: '60s'
16+
}
17+
const req = {
18+
query: params
19+
}
20+
const res = new Transform({
21+
transform (chunk, encoding, callback) {
22+
callback(null, chunk)
23+
}
24+
})
25+
res.writeHead = () => {}
26+
const cliqlParams = parseCliQL(req.query.query)
27+
if (cliqlParams) {
28+
scanClickhouse(cliqlParams, { res }, params)
29+
} else {
30+
await scanFingerprints(
31+
req.query,
32+
{ res: res }
33+
)
34+
}
35+
let str = ''
36+
res.on('data', (d) => {
37+
str += d
38+
})
39+
await new Promise((resolve, reject) => {
40+
res.once('error', reject)
41+
res.once('close', resolve)
42+
res.once('end', resolve)
43+
})
44+
return JSON.parse(str)
45+
}/* ,
46+
promql: async () => {
47+
48+
} */
49+
}
50+
const plugins = eng.getPlg({ type: 'custom_processor' })
51+
for (const plugin of Object.values(plugins)) {
52+
for (const e of Object.entries(options)) {
53+
plugin[e[0]] = e[1]
54+
}
55+
if (plugin.check()) {
56+
return await plugin.process()
57+
}
58+
}
59+
}

lib/handlers/query_range.js

+17-11
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
*/
1111

1212
const { parseCliQL } = require('../cliql')
13+
const { checkCustomPlugins } = require('./common')
1314

1415
async function handler (req, res) {
1516
req.log.debug('GET /loki/api/v1/query_range')
@@ -28,17 +29,22 @@ async function handler (req, res) {
2829
const cliqlParams = parseCliQL(req.query.query)
2930
if (cliqlParams) {
3031
this.scanClickhouse(cliqlParams, res, params)
31-
} else {
32-
try {
33-
await this.scanFingerprints(
34-
req.query,
35-
{ res: res.raw }
36-
)
37-
res.hijack()
38-
} catch (err) {
39-
req.log.error({ err })
40-
res.send(resp)
41-
}
32+
return
33+
}
34+
const pluginOut = await checkCustomPlugins(req.query)
35+
if (pluginOut) {
36+
res.header('Content-Type', pluginOut.type)
37+
return res.send(pluginOut.out)
38+
}
39+
try {
40+
await this.scanFingerprints(
41+
req.query,
42+
{ res: res.raw }
43+
)
44+
res.hijack()
45+
} catch (err) {
46+
req.log.error({ err })
47+
res.send(resp)
4248
}
4349
}
4450

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
const { PluginTypeLoaderBase } = require('plugnplay')
2+
module.exports = class extends PluginTypeLoaderBase {
3+
exportSync (opts) {
4+
return {
5+
props: ['check', 'process'],
6+
validate: (exports) => {
7+
return exports
8+
}
9+
}
10+
}
11+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
id: custom_processor
2+
name: Custom Processor Plugin
3+
description: plugin to custom process a logql / promql request
4+
loader: index.js

plugins/output_format/index.js

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
const { PluginLoaderBase } = require('plugnplay')
2+
3+
/**
4+
* @class Plugin
5+
* @property {string} query
6+
* @property start {number} start in NS
7+
* @property end {string} end in NS
8+
* @property type {string} promql or logql
9+
* @property limit {number}
10+
* @property {{
11+
* logql: (query: string, startNS: number, endNS: number, limit: number) => Promise<Object>
12+
* }} API
13+
* promql: (query: string, startNS: number, endNS: number, limit: number) => Promise<Object> //not implemented
14+
*/
15+
class Plugin {
16+
/**
17+
* @method
18+
* @name check
19+
* @this {Plg}
20+
* @returns {boolean} if this plugin is usable for the query
21+
*/
22+
check () {
23+
return this.query.match(/^toCsv\(.+\)\s*$/)
24+
}
25+
26+
/**
27+
* @method
28+
* @name process
29+
* @this {Plg}
30+
* @returns {Promise<{type: string, out: string}>} The raw output
31+
*/
32+
async process () {
33+
const match = this.query.match(/^toCsv\((.+)\)$/)
34+
const response = await this.API.logql(match[1], this.start, this.end, this.limit)
35+
let res = ''
36+
for (const stream of response.data.result) {
37+
const labels = JSON.stringify(stream.stream)
38+
for (const val of stream.values) {
39+
res += `${labels}\t${val[0]}\t${val[1]}\n`
40+
}
41+
}
42+
return {
43+
type: 'text/csv',
44+
out: res
45+
}
46+
}
47+
}
48+
class Plg extends PluginLoaderBase {
49+
exportSync (api) {
50+
return new Plugin()
51+
}
52+
}
53+
54+
module.exports = Plg

plugins/output_format/plugnplay.yml

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
id: output_format
2+
name: Format Output
3+
description: Change output format
4+
loader: index.js
5+
type: custom_processor

test/e2e

Submodule e2e updated from 8ecfe0f to 97b3702

0 commit comments

Comments
 (0)