Skip to content

Commit 158a8d8

Browse files
authored
feat: add Prometheus support (#74)
* docs: doc around metrics * feat: added prometheus dependency * define metric instrument interface * feat: init prometheus collector * feat: implement stubs for metric instrument * feat: interface established for metrics * update gauge to type interface for less type casting * update server.go and handler.go to use new instrumentation scheme * feat: update to make instrumentation static * changes in metric instrument signature * feat: add cast dependency for converting values reliably. * feat: implement interface methods * feat: add cast dependency update * feat: implement prometheus methods * feat: run server in a go routine * fix: correct error return format * fix: add missing label for count metric * feat: added log for metric server shut down * feat: convert decrement counter to a differnt metric for calculation during observation * feat: removed unused decrement counter * feat: bump raccoon to version 1.15 * update sample env files to include new metric config * feat: bump to version 1.16 for golang * bump golang version to 1.18 * fix: remove usage of errors.join since it requires go > 1.20 * feat: bump docker golang version to 1.20 * revert: docker changes for protoc * feat: bump golang version to 1.18 in go.mod * fix: label inconsistencies * feat: update setup-go version * feat: set prometheus as the default for metrics in test * feat: remove telegraf dependency from docker compose * feat: added setup go * run in same image * feat: added config values documentation * feat: upgrade running image to debian:bookworm * feat: added buckets according metric range approximates * update docs with missing metrics * refactor: set unused locally rather than using mute options * refactor: formatting and uniformity changes * feat: add logging for any error encountered during metric scrape * fix: early return while registering metrics * feat: remove 1 as bucket resolution due to low probability of the bucket filling * feat: add interface abstraction for mocking * feat: added prometheus tests * introduce delay in metric server initialisation * feat: added metric tests * feat: add metric test * remove test for statsD setup * feat: move to config instead of loading from env * refactor: refactor statsD implementation * refactor: cleanup metrics.md and fix issues * added documentation * feat: improved text on logging and help of metrics * feat: update go.mod * feat: add common config for recording runtime stats * record err in case of casting error * feat: add support for error in case of unable to cast. Added tests for the same * fix: metric name fetch
1 parent db7f808 commit 158a8d8

29 files changed

+1381
-324
lines changed

.env.sample

+5-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ SERVER_CORS_ALLOWED_HEADERS=""
1717

1818
SERVER_GRPC_PORT=8081
1919

20+
21+
2022
WORKER_BUFFER_CHANNEL_SIZE=5
2123
WORKER_BUFFER_FLUSH_TIMEOUT_MS=5000
2224
WORKER_POOL_SIZE=5
@@ -32,7 +34,9 @@ PUBLISHER_KAFKA_CLIENT_STATISTICS_INTERVAL_MS=5000
3234
PUBLISHER_KAFKA_CLIENT_QUEUE_BUFFERING_MAX_MESSAGES=100000
3335
PUBLISHER_KAFKA_FLUSH_INTERVAL_MS=1000
3436

37+
METRIC_RUNTIME_STATS_RECORD_INTERVAL_MS=1000
38+
METRIC_PROMETHEUS_ENABLED="true"
3539
METRIC_STATSD_ADDRESS=":8125"
36-
METRIC_STATSD_FLUSH_PERIOD_MS=100
40+
METRIC_STATSD_FLUSH_PERIOD_MS=1000
3741

3842
LOG_LEVEL="info"

.env.test

+3-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ PUBLISHER_KAFKA_CLIENT_STATISTICS_INTERVAL_MS=5000
3333
PUBLISHER_KAFKA_CLIENT_QUEUE_BUFFERING_MAX_MESSAGES=100000
3434
PUBLISHER_KAFKA_FLUSH_INTERVAL_MS=1000
3535

36+
METRIC_RUNTIME_STATS_RECORD_INTERVAL_MS=1000
37+
METRIC_PROMETHEUS_ENABLED="true"
3638
METRIC_STATSD_ADDRESS=":8125"
37-
METRIC_STATSD_FLUSH_PERIOD_MS=100
39+
METRIC_STATSD_FLUSH_PERIOD_MS=1000
3840

3941
LOG_LEVEL="info"

.github/workflows/build.yaml

+4-4
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@ jobs:
99
runs-on: ubuntu-latest
1010
steps:
1111
- name: Setup Go
12-
uses: actions/setup-go@v2.1.3
12+
uses: actions/setup-go@v3
1313
with:
14-
go-version: "1.14"
14+
go-version: "1.18"
1515
- name: Checkout repo
1616
uses: actions/checkout@v2
1717
with:
@@ -25,9 +25,9 @@ jobs:
2525
runs-on: ubuntu-latest
2626
steps:
2727
- name: Setup Go
28-
uses: actions/setup-go@v2.1.3
28+
uses: actions/setup-go@v3
2929
with:
30-
go-version: "1.14"
30+
go-version: "1.18"
3131
- name: Install Protoc
3232
uses: arduino/setup-protoc@v1
3333
- uses: actions/checkout@v2

.github/workflows/integration-test.yaml

+4
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ jobs:
1414
uses: arduino/setup-protoc@v1
1515
- name: Checkout repo
1616
uses: actions/checkout@v2
17+
- name: Setup Go
18+
uses: actions/setup-go@v3
19+
with:
20+
go-version: "1.18"
1721
- name: Copy integration config
1822
run: cp .env.test .env
1923
- name: Run Raccoon

Dockerfile

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM golang:1.14
1+
FROM golang:1.18
22

33
WORKDIR /app
44
RUN apt-get update && apt-get install unzip --no-install-recommends --assume-yes
@@ -10,7 +10,8 @@ RUN PROTOC_ZIP=protoc-3.17.3-linux-x86_64.zip && \
1010
COPY . .
1111
RUN make build
1212

13-
FROM debian:buster-slim
13+
14+
FROM debian:bookworm-slim
1415
WORKDIR /app
1516
COPY --from=0 /app/raccoon ./raccoon
1617
COPY . .

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ You can consume the published events from the host machine by using `localhost:9
7777

7878
Prerequisite:
7979

80-
- You need to have [GO](https://golang.org/) 1.14 or above installed
80+
- You need to have [GO](https://golang.org/) 1.18 or above installed
8181
- You need `protoc` [installed](https://github.com/protocolbuffers/protobuf#protocol-compiler-installation)
8282

8383
```sh

app/server.go

+11-12
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func shutDownServer(ctx context.Context, cancel context.CancelFunc, httpServices
6363
Until then we fall back to approximation */
6464
eventsInChannel := len(bufferChannel) * 7
6565
logger.Info(fmt.Sprintf("Outstanding unprocessed events in the channel, data lost ~ (No batches %d * 5 events) = ~%d", len(bufferChannel), eventsInChannel))
66-
metrics.Count("kafka_messages_delivered_total", eventsInChannel+eventsInProducer, "success=false")
66+
metrics.Count("kafka_messages_delivered_total", int64(eventsInChannel+eventsInProducer), map[string]string{"success": "false", "conn_group": "NA", "event_type": "NA"})
6767
logger.Info("Exiting server")
6868
cancel()
6969
default:
@@ -73,20 +73,19 @@ func shutDownServer(ctx context.Context, cancel context.CancelFunc, httpServices
7373
}
7474

7575
func reportProcMetrics() {
76-
t := time.Tick(config.MetricStatsd.FlushPeriodMs)
76+
t := time.Tick(config.MetricInfo.RuntimeStatsRecordInterval)
7777
m := &runtime.MemStats{}
7878
for {
7979
<-t
80-
metrics.Gauge("server_go_routines_count_current", runtime.NumGoroutine(), "")
81-
80+
metrics.Gauge("server_go_routines_count_current", runtime.NumGoroutine(), map[string]string{})
8281
runtime.ReadMemStats(m)
83-
metrics.Gauge("server_mem_heap_alloc_bytes_current", m.HeapAlloc, "")
84-
metrics.Gauge("server_mem_heap_inuse_bytes_current", m.HeapInuse, "")
85-
metrics.Gauge("server_mem_heap_objects_total_current", m.HeapObjects, "")
86-
metrics.Gauge("server_mem_stack_inuse_bytes_current", m.StackInuse, "")
87-
metrics.Gauge("server_mem_gc_triggered_current", m.LastGC/1000, "")
88-
metrics.Gauge("server_mem_gc_pauseNs_current", m.PauseNs[(m.NumGC+255)%256]/1000, "")
89-
metrics.Gauge("server_mem_gc_count_current", m.NumGC, "")
90-
metrics.Gauge("server_mem_gc_pauseTotalNs_current", m.PauseTotalNs, "")
82+
metrics.Gauge("server_mem_heap_alloc_bytes_current", m.HeapAlloc, map[string]string{})
83+
metrics.Gauge("server_mem_heap_inuse_bytes_current", m.HeapInuse, map[string]string{})
84+
metrics.Gauge("server_mem_heap_objects_total_current", m.HeapObjects, map[string]string{})
85+
metrics.Gauge("server_mem_stack_inuse_bytes_current", m.StackInuse, map[string]string{})
86+
metrics.Gauge("server_mem_gc_triggered_current", m.LastGC/1000, map[string]string{})
87+
metrics.Gauge("server_mem_gc_pauseNs_current", m.PauseNs[(m.NumGC+255)%256]/1000, map[string]string{})
88+
metrics.Gauge("server_mem_gc_count_current", m.NumGC, map[string]string{})
89+
metrics.Gauge("server_mem_gc_pauseTotalNs_current", m.PauseTotalNs, map[string]string{})
9190
}
9291
}

config/load.go

+3
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,16 @@ func Load() {
2121
viper.ReadInConfig()
2222

2323
logConfigLoader()
24+
2425
publisherKafkaConfigLoader()
2526
serverConfigLoader()
2627
serverWsConfigLoader()
2728
serverGRPCConfigLoader()
2829
serverCorsConfigLoader()
2930
workerConfigLoader()
31+
metricCommonConfigLoader()
3032
metricStatsdConfigLoader()
33+
metricPrometheusConfigLoader()
3134
eventDistributionConfigLoader()
3235
eventConfigLoader()
3336
}

config/metric.go

+33
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,50 @@ import (
99
)
1010

1111
var MetricStatsd metricStatsdCfg
12+
var MetricPrometheus metricPrometheusCfg
13+
var MetricInfo metricInfoCfg
1214

1315
type metricStatsdCfg struct {
16+
Enabled bool
1417
Address string
1518
FlushPeriodMs time.Duration
1619
}
1720

21+
type metricPrometheusCfg struct {
22+
Enabled bool
23+
Port int
24+
Path string
25+
}
26+
27+
type metricInfoCfg struct {
28+
RuntimeStatsRecordInterval time.Duration
29+
}
30+
1831
func metricStatsdConfigLoader() {
32+
viper.SetDefault("METRIC_STATSD_ENABLED", false)
1933
viper.SetDefault("METRIC_STATSD_ADDRESS", ":8125")
2034
viper.SetDefault("METRIC_STATSD_FLUSH_PERIOD_MS", 10000)
2135
MetricStatsd = metricStatsdCfg{
36+
Enabled: util.MustGetBool("METRIC_STATSD_ENABLED"),
2237
Address: util.MustGetString("METRIC_STATSD_ADDRESS"),
2338
FlushPeriodMs: util.MustGetDuration("METRIC_STATSD_FLUSH_PERIOD_MS", time.Millisecond),
2439
}
2540
}
41+
42+
func metricPrometheusConfigLoader() {
43+
viper.SetDefault("METRIC_PROMETHEUS_ENABLED", false)
44+
viper.SetDefault("METRIC_PROMETHEUS_PORT", 9090)
45+
viper.SetDefault("METRIC_PROMETHEUS_PATH", "/metrics")
46+
MetricPrometheus = metricPrometheusCfg{
47+
Enabled: util.MustGetBool("METRIC_PROMETHEUS_ENABLED"),
48+
Port: util.MustGetInt("METRIC_PROMETHEUS_PORT"),
49+
Path: util.MustGetString("METRIC_PROMETHEUS_PATH"),
50+
}
51+
}
52+
53+
func metricCommonConfigLoader() {
54+
viper.SetDefault("METRIC_RUNTIME_STATS_RECORD_INTERVAL_MS", 10000)
55+
MetricInfo = metricInfoCfg{
56+
RuntimeStatsRecordInterval: util.MustGetDuration("METRIC_RUNTIME_STATS_RECORD_INTERVAL_MS", time.Millisecond),
57+
}
58+
}

docker-compose.yml

+12-10
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ version: '3.9'
33
networks:
44
cs-network:
55

6+
67
services:
78
zookeeper:
89
image: confluentinc/cp-zookeeper:5.1.2
@@ -41,14 +42,14 @@ services:
4142
cs:
4243
build:
4344
context: .
44-
command: ["/bin/sh", "-c", "./raccoon"]
45+
command: [ "/bin/sh", "-c", "./raccoon" ]
4546
hostname: cs
4647
container_name: cs
4748
stdin_open: true
4849
tty: true
4950
depends_on:
5051
- kafka
51-
- telegraf
52+
# - telegraf
5253
environment:
5354
SERVER_WEBSOCKET_PORT: "8080"
5455
SERVER_WEBSOCKET_CHECK_ORIGIN: "true"
@@ -74,6 +75,7 @@ services:
7475
PUBLISHER_KAFKA_CLIENT_STATISTICS_INTERVAL_MS: 5000
7576
PUBLISHER_KAFKA_CLIENT_QUEUE_BUFFERING_MAX_MESSAGES: 100000
7677
PUBLISHER_KAFKA_FLUSH_INTERVAL_MS: 1000
78+
METRIC_PROMETHEUS_ENABLED: "true"
7779
METRIC_STATSD_ADDRESS: "telegraf:8125"
7880
METRIC_STATSD_FLUSH_PERIOD_MS: 100
7981
LOG_LEVEL: "info"
@@ -82,11 +84,11 @@ services:
8284
- "8081:8081"
8385
networks:
8486
- cs-network
85-
telegraf:
86-
image: telegraf
87-
volumes:
88-
- ./.telegraf.sample.conf:/etc/telegraf/telegraf.conf:ro
89-
ports:
90-
- "8125:8125"
91-
networks:
92-
- cs-network
87+
# telegraf:
88+
# image: telegraf
89+
# volumes:
90+
# - ./.telegraf.sample.conf:/etc/telegraf/telegraf.conf:ro
91+
# ports:
92+
# - "8125:8125"
93+
# networks:
94+
# - cs-network

docs/docs/reference/configurations.md

+35
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,20 @@ Upon shutdown, the publisher will try to finish processing events in buffer befo
249249

250250
## Metric
251251

252+
### `METRIC_RUNTIME_STATS_RECORD_INTERVAL_MS`
253+
254+
The time interval between recording runtime stats of the application in the insturmentation. It's recommended to keep this value equivalent to flush interval when using statsd and your collector's scrape interval when using prometheus as your instrumentation.
255+
256+
- Type `Optional`
257+
- Default Value: `10000`
258+
259+
### `METRIC_STATSD_ENABLED`
260+
261+
Flag to enable export of statsd metric
262+
263+
- Type `Optional`
264+
- Default value: `false`
265+
252266
### `METRIC_STATSD_ADDRESS`
253267

254268
Address to reports the service metrics.
@@ -263,6 +277,27 @@ Interval for the service to push metrics.
263277
- Type `Optional`
264278
- Default value: `10000`
265279

280+
### `METRIC_PROMETHEUS_ENABLED`
281+
282+
Flag to enable a prometheus http server to expose metrics.
283+
284+
- Type `Optional`
285+
- Default value: `false`
286+
287+
### `METRIC_PROMETHEUS_PATH`
288+
289+
The path at which prometheus server should serve metrics.
290+
291+
- Type `Optional`
292+
- Default value: `/metrics`
293+
294+
### `METRIC_PROMETHEUS_PORT`
295+
296+
The port number on which prometheus server will be listening for metric scraping requests.
297+
298+
- Type `Optional`
299+
- Default value: `9090`
300+
266301
## Log
267302

268303
### `LOG_LEVEL`

docs/docs/reference/metrics.md

+46-1
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,30 @@ Duration of alive connection per session per connection
5353
- Type: `Timing`
5454
- Tags: `conn_group=*`
5555

56+
### `conn_close_err_count`
57+
58+
Number of connection close errors encountered
59+
60+
- Type: `Count`
61+
- Tags: NA
62+
5663
## Kafka Publisher
5764

5865
### `kafka_messages_delivered_total`
5966

60-
Number of delivered events to Kafka
67+
Number of delivered events to Kafka. The metric also contains false increments. To find the true value, one should use the difference between this and `kafka_messages_undelivered_total` metric for the same tag/labels.
68+
69+
- Type: `Count`
70+
- Tags: `success=false` `success=true` `conn_group=*` `event_type=*`
71+
72+
### `kafka_messages_undelivered_total`
73+
74+
The count of false increments done by `kafka_messages_delivered_total`. To be used in conjunction with the former for accurate metrics.
6175

6276
- Type: `Count`
6377
- Tags: `success=false` `success=true` `conn_group=*` `event_type=*`
6478

79+
6580
### `kafka_unknown_topic_failure_total`
6681

6782
Number of delivery failure caused by topic does not exist in kafka.
@@ -102,6 +117,29 @@ Broker latency / round-trip time in microseconds
102117
- Type: `Gauge`
103118
- Tags: `broker=broker_nodes`
104119

120+
### `ack_event_rtt_ms`
121+
122+
Time taken from ack function called by kafka producer to processed by the ack handler.
123+
124+
- Type: `Timing`
125+
- Tags: NA
126+
127+
### `event_rtt_ms`
128+
129+
Time taken from event is consumed from the queue to be acked by the ack handler.
130+
131+
- Type: `Timing`
132+
- Tags: NA
133+
134+
### `kafka_producebulk_tt_ms`
135+
136+
Response time of produce batch method of the kafka producer
137+
138+
- Type `Timing`
139+
- Tags: NA
140+
141+
142+
105143
## Resource Usage
106144

107145
### `server_mem_gc_triggered_current`
@@ -178,6 +216,13 @@ Number of events received in requests
178216
- Type: `Count`
179217
- Tags: `conn_group=*` `event_type=*`
180218

219+
### `events_duplicate_total`
220+
221+
Number of duplicate events
222+
223+
- Type: `Count`
224+
- Tags: `conn_group=*` `reason=*`
225+
181226
### `batches_read_total`
182227

183228
Request count

0 commit comments

Comments
 (0)