1
- // Copyright 2020-2024 The NATS Authors
1
+ // Copyright 2020-2025 The NATS Authors
2
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
3
// you may not use this file except in compliance with the License.
4
4
// You may obtain a copy of the License at
@@ -17,6 +17,7 @@ import (
17
17
"encoding/json"
18
18
"log"
19
19
"os"
20
+ "sync"
20
21
"time"
21
22
22
23
"github.com/ghodss/yaml"
@@ -30,15 +31,38 @@ import (
30
31
)
31
32
32
33
type Check struct {
33
- Name string `yaml:"name"`
34
- Kind string `yaml:"kind"`
35
- Context string `yaml:"context"`
36
- Properties json.RawMessage `yaml:"properties"`
34
+ Name string `json:"name" yaml:"name"`
35
+ Kind string `json:"kind" yaml:"kind"`
36
+ Context string `json:"context" yaml:"context"`
37
+ ReuseConn bool `json:"reuse_connection" yaml:"reuse_connection"`
38
+ Properties json.RawMessage `json:"properties" yaml:"properties"`
39
+ nc * nats.Conn
40
+ mu sync.Mutex
41
+ }
42
+
43
+ func (c * Check ) connect (urls string , opts ... nats.Option ) (* nats.Conn , error ) {
44
+ c .mu .Lock ()
45
+ defer c .mu .Unlock ()
46
+
47
+ if ! c .ReuseConn {
48
+ return nil , nil
49
+ }
50
+
51
+ if c .nc != nil {
52
+ return c .nc , nil
53
+ }
54
+
55
+ opts = append (opts , nats .MaxReconnects (- 1 ))
56
+
57
+ var err error
58
+ c .nc , err = nats .Connect (urls , opts ... )
59
+
60
+ return c .nc , err
37
61
}
38
62
39
63
type Config struct {
40
- Context string `yaml:"context"`
41
- Checks []Check `yaml:"checks"`
64
+ Context string `yaml:"context"`
65
+ Checks []* Check `yaml:"checks"`
42
66
}
43
67
44
68
type Exporter struct {
@@ -130,7 +154,7 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
130
154
continue
131
155
}
132
156
133
- callCheck (& check , f )
157
+ callCheck (check , f )
134
158
}
135
159
}
136
160
@@ -154,7 +178,16 @@ func (e *Exporter) checkRequest(servers string, natsOpts []nats.Option, jsmOpts
154
178
return
155
179
}
156
180
157
- err = monitor .CheckRequest (servers , natsOpts , result , 5 * time .Second , copts )
181
+ nc , err := check .connect (servers , natsOpts ... )
182
+ if result .CriticalIfErr (err , "connection failed: %v" , err ) {
183
+ return
184
+ }
185
+
186
+ if nc != nil {
187
+ err = monitor .CheckRequestWithConnection (nc , result , 5 * time .Second , copts )
188
+ } else {
189
+ err = monitor .CheckRequest (servers , natsOpts , result , 5 * time .Second , copts )
190
+ }
158
191
result .CriticalIfErr (err , "check failed: %v" , err )
159
192
}
160
193
@@ -176,7 +209,16 @@ func (e *Exporter) checkServer(servers string, natsOpts []nats.Option, jsmOpts [
176
209
return
177
210
}
178
211
179
- err = monitor .CheckServer (servers , natsOpts , result , time .Second , copts )
212
+ nc , err := check .connect (servers , natsOpts ... )
213
+ if result .CriticalIfErr (err , "connection failed: %v" , err ) {
214
+ return
215
+ }
216
+
217
+ if nc != nil {
218
+ err = monitor .CheckServerWithConnection (nc , result , time .Second , copts )
219
+ } else {
220
+ err = monitor .CheckServer (servers , natsOpts , result , time .Second , copts )
221
+ }
180
222
result .CriticalIfErr (err , "check failed: %v" , err )
181
223
}
182
224
@@ -196,7 +238,16 @@ func (e *Exporter) checkJetStream(servers string, natsOpts []nats.Option, jsmOpt
196
238
return
197
239
}
198
240
199
- err = monitor .CheckJetStreamAccount (servers , natsOpts , jsmOpts , result , copts )
241
+ nc , err := check .connect (servers , natsOpts ... )
242
+ if result .CriticalIfErr (err , "connection failed: %v" , err ) {
243
+ return
244
+ }
245
+
246
+ if nc != nil {
247
+ err = monitor .CheckJetStreamAccountWithConnection (nc , jsmOpts , result , copts )
248
+ } else {
249
+ err = monitor .CheckJetStreamAccount (servers , natsOpts , jsmOpts , result , copts )
250
+ }
200
251
result .CriticalIfErr (err , "check failed: %v" , err )
201
252
}
202
253
@@ -207,7 +258,16 @@ func (e *Exporter) checkMeta(servers string, natsOpts []nats.Option, jsmOpts []j
207
258
return
208
259
}
209
260
210
- err = monitor .CheckJetstreamMeta (servers , natsOpts , result , copts )
261
+ nc , err := check .connect (servers , natsOpts ... )
262
+ if result .CriticalIfErr (err , "connection failed: %v" , err ) {
263
+ return
264
+ }
265
+
266
+ if nc != nil {
267
+ err = monitor .CheckJetstreamMetaWithConnection (nc , result , copts )
268
+ } else {
269
+ err = monitor .CheckJetstreamMeta (servers , natsOpts , result , copts )
270
+ }
211
271
result .CriticalIfErr (err , "check failed: %v" , err )
212
272
}
213
273
@@ -218,7 +278,16 @@ func (e *Exporter) checkMessage(servers string, natsOpts []nats.Option, jsmOpts
218
278
return
219
279
}
220
280
221
- err = monitor .CheckStreamMessage (servers , natsOpts , jsmOpts , result , copts )
281
+ nc , err := check .connect (servers , natsOpts ... )
282
+ if result .CriticalIfErr (err , "connection failed: %v" , err ) {
283
+ return
284
+ }
285
+
286
+ if nc != nil {
287
+ err = monitor .CheckStreamMessageWithConnection (nc , jsmOpts , result , copts )
288
+ } else {
289
+ err = monitor .CheckStreamMessage (servers , natsOpts , jsmOpts , result , copts )
290
+ }
222
291
result .CriticalIfErr (err , "check failed: %v" , err )
223
292
}
224
293
@@ -232,7 +301,17 @@ func (e *Exporter) checkKv(servers string, natsOpts []nats.Option, jsmOpts []jsm
232
301
return
233
302
}
234
303
235
- err = monitor .CheckKVBucketAndKey (servers , natsOpts , result , copts )
304
+ nc , err := check .connect (servers , natsOpts ... )
305
+ if result .CriticalIfErr (err , "connection failed: %v" , err ) {
306
+ return
307
+ }
308
+
309
+ if nc != nil {
310
+ err = monitor .CheckKVBucketAndKeyWithConnection (nc , result , copts )
311
+ } else {
312
+ err = monitor .CheckKVBucketAndKey (servers , natsOpts , result , copts )
313
+ }
314
+
236
315
result .CriticalIfErr (err , "check failed: %v" , err )
237
316
}
238
317
@@ -243,7 +322,16 @@ func (e *Exporter) checkConsumer(servers string, natsOpts []nats.Option, jsmOpts
243
322
return
244
323
}
245
324
246
- err = monitor .ConsumerHealthCheck (servers , natsOpts , jsmOpts , result , copts , api .NewDiscardLogger ())
325
+ nc , err := check .connect (servers , natsOpts ... )
326
+ if result .CriticalIfErr (err , "connection failed: %v" , err ) {
327
+ return
328
+ }
329
+
330
+ if nc != nil {
331
+ err = monitor .ConsumerHealthCheckWithConnection (nc , jsmOpts , result , copts , api .NewDiscardLogger ())
332
+ } else {
333
+ err = monitor .ConsumerHealthCheck (servers , natsOpts , jsmOpts , result , copts , api .NewDiscardLogger ())
334
+ }
247
335
result .CriticalIfErr (err , "check failed: %v" , err )
248
336
}
249
337
@@ -254,7 +342,16 @@ func (e *Exporter) checkStream(servers string, natsOpts []nats.Option, jsmOpts [
254
342
return
255
343
}
256
344
257
- err = monitor .CheckStreamHealth (servers , natsOpts , jsmOpts , result , copts , api .NewDiscardLogger ())
345
+ nc , err := check .connect (servers , natsOpts ... )
346
+ if result .CriticalIfErr (err , "connection failed: %v" , err ) {
347
+ return
348
+ }
349
+
350
+ if nc != nil {
351
+ err = monitor .CheckStreamHealthWithConnection (nc , jsmOpts , result , copts , api .NewDiscardLogger ())
352
+ } else {
353
+ err = monitor .CheckStreamHealth (servers , natsOpts , jsmOpts , result , copts , api .NewDiscardLogger ())
354
+ }
258
355
result .CriticalIfErr (err , "check failed: %v" , err )
259
356
}
260
357
0 commit comments