Skip to content

Commit b2e906e

Browse files
committed
#feat: bun support
1 parent dd9eaa2 commit b2e906e

37 files changed

+664
-122
lines changed

.github/workflows/node-clickhouse.js.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,4 @@ jobs:
4343
CLICKHOUSE_TSDB: loki
4444
INTEGRATION_E2E: 1
4545
CLOKI_EXT_URL: 127.0.0.1:3100
46-
run: node qryn.js >/dev/stdout & npm run test --forceExit
46+
run: node qryn.mjs >/dev/stdout & npm run test --forceExit

common.js

+2
Original file line numberDiff line numberDiff line change
@@ -125,3 +125,5 @@ module.exports.isCustomSamplesOrderingRule = () => {
125125
module.exports.CORS = process.env.CORS_ALLOW_ORIGIN || '*'
126126

127127
module.exports.clusterName = process.env.CLUSTER_NAME
128+
129+
module.exports.readonly = process.env.READONLY || false

docker/docker-compose-centos.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,4 @@ services:
3939
container_name: centos
4040
volumes:
4141
- ../:/opt/qryn
42-
entrypoint: bash -c 'cd ~ ; cp -rf /opt/qryn . ; cd qryn; ls -la ; rm -rf node_modules ; npm install ; CLICKHOUSE_DB=loki CLICKHOUSE_TSDB=loki INTEGRATION_E2E=1 CLICKHOUSE_SERVER=clickhouse-seed node qryn.js'
42+
entrypoint: bash -c 'cd ~ ; cp -rf /opt/qryn . ; cd qryn; ls -la ; rm -rf node_modules ; npm install ; CLICKHOUSE_DB=loki CLICKHOUSE_TSDB=loki INTEGRATION_E2E=1 CLICKHOUSE_SERVER=clickhouse-seed node qryn.mjs'

lib/bun_wrapper.js

+167
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
const { Transform } = require('stream')
2+
const log = require('./logger')
3+
const { EventEmitter } = require('events')
4+
5+
class BodyStream extends Transform {
6+
_transform (chunk, encoding, callback) {
7+
callback(null, chunk)
8+
}
9+
10+
once (event, listerer) {
11+
const self = this
12+
const _listener = (e) => {
13+
listerer(e)
14+
self.removeListener(event, _listener)
15+
}
16+
this.on(event, _listener)
17+
}
18+
}
19+
20+
const wrapper = (handler, parsers) => {
21+
/**
22+
* @param ctx {Request}
23+
*/
24+
const res = async (ctx, server) => {
25+
let response = ''
26+
let status = 200
27+
let reqBody = ''
28+
let headers = {}
29+
30+
const stream = new BodyStream()
31+
setTimeout(async () => {
32+
if (!ctx.body) {
33+
stream.end()
34+
return
35+
}
36+
for await (const chunk of ctx.body) {
37+
stream.write(chunk)
38+
}
39+
stream.end()
40+
})
41+
const req = {
42+
headers: Object.fromEntries(ctx.headers.entries()),
43+
raw: stream,
44+
log: log,
45+
params: ctx.params || {},
46+
query: {}
47+
}
48+
for (const [key, value] of (new URL(ctx.url)).searchParams) {
49+
if (!(key in req.query)) {
50+
req.query[key] = value
51+
continue
52+
}
53+
req.query[key] = Array.isArray(req.query[key])
54+
? [...req.query[key], value]
55+
: [req.query[key], value]
56+
}
57+
const res = {
58+
send: (msg) => {
59+
response = msg
60+
},
61+
code: (code) => {
62+
status = code
63+
return res
64+
},
65+
header: (key, value) => {
66+
headers[key] = value
67+
return res
68+
},
69+
headers: (hdrs) => {
70+
headers = { ...headers, ...hdrs }
71+
return res
72+
}
73+
}
74+
75+
if (parsers) {
76+
const contentType = (ctx.headers.get('Content-Type') || '')
77+
let ok = false
78+
for (const [type, parser] of Object.entries(parsers)) {
79+
if (type !== '*' && contentType.indexOf(type) > -1) {
80+
log.debug(`parsing ${type}`)
81+
reqBody = await parser(req, stream)
82+
ok = true
83+
log.debug(`parsing ${type} ok`)
84+
}
85+
}
86+
if (!ok && parsers['*']) {
87+
log.debug('parsing *')
88+
reqBody = await parsers['*'](req, stream)
89+
ok = true
90+
log.debug('parsing * ok')
91+
}
92+
if (!ok) {
93+
throw new Error('undefined content type ' + contentType)
94+
}
95+
}
96+
97+
req.body = reqBody || stream
98+
99+
let result = handler(req, res)
100+
if (result && result.then) {
101+
result = await result
102+
}
103+
if (result && result.on) {
104+
response = ''
105+
result.on('data', (d) => {
106+
response += d
107+
})
108+
await new Promise((resolve, reject) => {
109+
result.on('end', resolve)
110+
result.on('error', reject)
111+
result.on('close', resolve)
112+
})
113+
result = null
114+
}
115+
if (result) {
116+
response = result
117+
}
118+
if (response instanceof Object && typeof response !== 'string') {
119+
response = JSON.stringify(response)
120+
}
121+
return new Response(response, { status: status, headers: headers })
122+
}
123+
return res
124+
}
125+
126+
const wsWrapper = (handler) => {
127+
/**
128+
* @param ctx {Request}
129+
*/
130+
const res = {
131+
open: async (ctx, server) => {
132+
const req = {
133+
headers: Object.fromEntries(ctx.data.ctx.headers.entries()),
134+
log: log,
135+
query: {}
136+
}
137+
for (const [key, value] of (new URL(ctx.data.ctx.url)).searchParams) {
138+
if (!(key in req.query)) {
139+
req.query[key] = value
140+
continue
141+
}
142+
req.query[key] = Array.isArray(req.query[key])
143+
? [...req.query[key], value]
144+
: [req.query[key], value]
145+
}
146+
147+
ctx.closeEmitter = new EventEmitter()
148+
ctx.closeEmitter.send = ctx.send.bind(ctx)
149+
150+
const ws = {
151+
socket: ctx.closeEmitter
152+
}
153+
154+
const result = handler(ws, { query: req.query })
155+
if (result && result.then) {
156+
await result
157+
}
158+
},
159+
close: (ctx) => { ctx.closeEmitter.emit('close') }
160+
}
161+
return res
162+
}
163+
164+
module.exports = {
165+
wrapper,
166+
wsWrapper
167+
}

lib/db/clickhouse.js

+1
Original file line numberDiff line numberDiff line change
@@ -455,6 +455,7 @@ const queryTempoScanV2 = async function (query) {
455455
}
456456
const limit = query.limit ? `LIMIT ${parseInt(query.limit)}` : ''
457457
const sql = `${select} ${from} WHERE ${where.join(' AND ')} ORDER BY timestamp_ns DESC ${limit} FORMAT JSON`
458+
console.log(sql)
458459
const resp = await rawRequest(sql, null, process.env.CLICKHOUSE_DB || 'cloki')
459460
return resp.data.data ? resp.data.data : JSON.parse(resp.data).data
460461
}

lib/db/zipkin.js

+7-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,13 @@ module.exports = class {
2626
* @returns {string}
2727
*/
2828
toJson () {
29-
return JSON.stringify(this, (k, val) => typeof val === 'bigint' ? val.toString() : val)
29+
const res = {
30+
...this,
31+
timestamp_ns: this.timestamp_ns.toString(),
32+
duration_ns: this.duration_ns.toString()
33+
}
34+
return JSON.stringify(res)
35+
//return JSON.stringify(this, (k, val) => typeof val === 'bigint' ? val.toString() : val)
3036
}
3137

3238
/**

lib/handlers/404.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
function handler (req, res) {
22
req.log.debug('unsupported', req.url)
3-
return res.send('404 Not Supported')
3+
return res.code(404).send('404 Not Supported')
44
}
55

66
module.exports = handler

lib/handlers/datadog_log_push.js

+11-7
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,24 @@
1818
*/
1919

2020
const stringify = require('../utils').stringify
21+
const DATABASE = require('../db/clickhouse')
22+
const { bulk_labels, bulk, labels } = DATABASE.cache
23+
const { fingerPrint } = require('../utils')
24+
const { readonly } = require('../../common')
25+
2126
const tagsToObject = (data, delimiter = ',') =>
2227
Object.fromEntries(data.split(',').map(v => {
2328
const fields = v.split(':')
2429
return [fields[0], fields[1]]
2530
}))
2631

2732
async function handler (req, res) {
28-
const self = this
2933
req.log.debug('Datadog Log Index Request')
3034
if (!req.body) {
3135
req.log.error('No Request Body or Target!')
3236
return res.code(400).send('{"status":400, "error": { "reason": "No Request Body" } }')
3337
}
34-
if (this.readonly) {
38+
if (readonly) {
3539
req.log.error('Readonly! No push support.')
3640
return res.code(400).send('{"status":400, "error": { "reason": "Read Only Mode" } }')
3741
}
@@ -69,18 +73,18 @@ async function handler (req, res) {
6973
}
7074
// Calculate Fingerprint
7175
const strJson = stringify(JSONLabels)
72-
finger = self.fingerPrint(strJson)
76+
finger = fingerPrint(strJson)
7377
// Store Fingerprint
74-
promises.push(self.bulk_labels.add([[
78+
promises.push(bulk_labels.add([[
7579
new Date().toISOString().split('T')[0],
7680
finger,
7781
strJson,
7882
JSONLabels.target || ''
7983
]]))
8084
for (const key in JSONLabels) {
8185
req.log.debug({ key, data: JSONLabels[key] }, 'Storing label')
82-
self.labels.add('_LABELS_', key)
83-
self.labels.add(key, JSONLabels[key])
86+
labels.add('_LABELS_', key)
87+
labels.add(key, JSONLabels[key])
8488
}
8589
} catch (err) {
8690
req.log.error({ err }, 'failed ingesting datadog log')
@@ -94,7 +98,7 @@ async function handler (req, res) {
9498
stream.message
9599
]
96100
req.log.debug({ finger, values }, 'store')
97-
promises.push(self.bulk.add([values]))
101+
promises.push(bulk.add([values]))
98102
})
99103
}
100104
await Promise.all(promises)

lib/handlers/datadog_series_push.js

+11-8
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,19 @@
2525
2626
*/
2727
const stringify = require('../utils').stringify
28+
const DATABASE = require('../db/clickhouse')
29+
const { bulk_labels, bulk, labels } = DATABASE.cache
30+
const { fingerPrint } = require('../utils')
31+
const { readonly } = require('../../common')
2832

2933
async function handler (req, res) {
30-
const self = this
3134
req.log.debug('Datadog Series Index Request')
3235
if (!req.body) {
3336
req.log.error('No Request Body!')
3437
res.code(500).send()
3538
return
3639
}
37-
if (this.readonly) {
40+
if (readonly) {
3841
req.log.error('Readonly! No push support.')
3942
res.code(500).send()
4043
return
@@ -63,18 +66,18 @@ async function handler (req, res) {
6366
}
6467
// Calculate Fingerprint
6568
const strJson = stringify(JSONLabels)
66-
finger = self.fingerPrint(strJson)
67-
self.labels.add(finger.toString(), stream.labels)
69+
finger = fingerPrint(strJson)
70+
labels.add(finger.toString(), stream.labels)
6871
// Store Fingerprint
69-
promises.push(self.bulk_labels.add([[
72+
promises.push(bulk_labels.add([[
7073
new Date().toISOString().split('T')[0],
7174
finger,
7275
strJson,
7376
JSONLabels.__name__ || 'undefined'
7477
]]))
7578
for (const key in JSONLabels) {
76-
self.labels.add('_LABELS_', key)
77-
self.labels.add(key, JSONLabels[key])
79+
labels.add('_LABELS_', key)
80+
labels.add(key, JSONLabels[key])
7881
}
7982
} catch (err) {
8083
req.log.error({ err })
@@ -97,7 +100,7 @@ async function handler (req, res) {
97100
entry.value,
98101
JSONLabels.__name__ || 'undefined'
99102
]
100-
promises.push(self.bulk.add([values]))
103+
promises.push(bulk.add([values]))
101104
})
102105
}
103106
})

lib/handlers/elastic_bulk.js

+13-7
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,18 @@
88

99
const { asyncLogError } = require('../../common')
1010
const stringify = require('../utils').stringify
11+
const DATABASE = require('../db/clickhouse')
12+
const { bulk_labels, bulk, labels } = DATABASE.cache
13+
const { fingerPrint } = require('../utils')
14+
const { readonly } = require('../../common')
1115

1216
async function handler (req, res) {
13-
const self = this
1417
req.log.debug('ELASTIC Bulk Request')
1518
if (!req.body) {
1619
asyncLogError('No Request Body or Target!' + req.body, req.log)
1720
return res.code(400).send('{"status":400, "error": { "reason": "No Request Body" } }')
1821
}
19-
if (this.readonly) {
22+
if (readonly) {
2023
asyncLogError('Readonly! No push support.', req.log)
2124
return res.code(400).send('{"status":400, "error": { "reason": "Read Only Mode" } }')
2225
}
@@ -38,6 +41,9 @@ async function handler (req, res) {
3841
const promises = []
3942
if (streams) {
4043
streams.forEach(function (stream) {
44+
if (!stream) {
45+
return
46+
}
4147
try {
4248
stream = JSON.parse(stream)
4349
} catch (err) { asyncLogError(err, req.log); return };
@@ -67,19 +73,19 @@ async function handler (req, res) {
6773
}
6874
// Calculate Fingerprint
6975
const strJson = stringify(JSONLabels)
70-
finger = self.fingerPrint(strJson)
76+
finger = fingerPrint(strJson)
7177
req.log.debug({ JSONLabels, finger }, 'LABELS FINGERPRINT')
7278
// Store Fingerprint
73-
promises.push(self.bulk_labels.add([[
79+
promises.push(bulk_labels.add([[
7480
new Date().toISOString().split('T')[0],
7581
finger,
7682
strJson,
7783
JSONLabels.target || ''
7884
]]))
7985
for (const key in JSONLabels) {
8086
req.log.debug({ key, data: JSONLabels[key] }, 'Storing label')
81-
self.labels.add('_LABELS_', key)
82-
self.labels.add(key, JSONLabels[key])
87+
labels.add('_LABELS_', key)
88+
labels.add(key, JSONLabels[key])
8389
}
8490
} catch (err) {
8591
asyncLogError(err, req.log)
@@ -93,7 +99,7 @@ async function handler (req, res) {
9399
JSON.stringify(stream) || stream
94100
]
95101
req.log.debug({ finger, values }, 'store')
96-
promises.push(self.bulk.add([values]))
102+
promises.push(bulk.add([values]))
97103

98104
// Reset State, Expect Command
99105
lastTags = false

0 commit comments

Comments
 (0)