diff --git a/docker-compose.yml b/docker-compose.yml index 3171c1b..8ebe091 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -22,3 +22,23 @@ services: - "3001:3001" depends_on: - nats + prometheus: + image: prom/prometheus:v2.53.4 + container_name: prometheus + volumes: + - ./prometheus.yml:/etc/prometheus/prometheus.yml:ro + ports: + - "9090:9090" + command: + - '--config.file=/etc/prometheus/prometheus.yml' + grafana: + image: grafana/grafana:11.6.0 + container_name: grafana + ports: + - "3002:3000" + environment: + - "GF_SECURITY_ADMIN_USER=admin" + - "GF_SECURITY_ADMIN_PASSWORD=admin" + volumes: + - ./grafana/provisioning:/etc/grafana/provisioning + - ./grafana/dashboards:/var/lib/grafana/dashboards diff --git a/go.mod b/go.mod index 3928234..c62dd62 100644 --- a/go.mod +++ b/go.mod @@ -6,17 +6,26 @@ require ( github.com/goccy/go-yaml v1.17.1 github.com/gorilla/websocket v1.5.3 github.com/nats-io/nats.go v1.40.1 + github.com/prometheus/client_golang v1.21.1 github.com/stretchr/testify v1.10.0 ) require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/klauspost/compress v1.18.0 // indirect + github.com/kr/text v0.2.0 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/nats-io/nkeys v0.4.9 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/prometheus/client_model v0.6.1 // indirect + github.com/prometheus/common v0.62.0 // indirect + github.com/prometheus/procfs v0.15.1 // indirect github.com/stretchr/objx v0.5.2 // indirect golang.org/x/crypto v0.31.0 // indirect golang.org/x/sys v0.28.0 // indirect + google.golang.org/protobuf v1.36.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 709f2a9..b286c29 100644 --- a/go.sum +++ b/go.sum @@ -1,11 +1,26 @@ +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/goccy/go-yaml v1.17.1 h1:LI34wktB2xEE3ONG/2Ar54+/HJVBriAGJ55PHls4YuY= github.com/goccy/go-yaml v1.17.1/go.mod h1:XBurs7gK8ATbW4ZPGKgcbrY1Br56PdM69F7LkFRi1kA= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/nats-io/nats.go v1.40.1 h1:MLjDkdsbGUeCMKFyCFoLnNn/HDTqcgVa3EQm+pMNDPk= github.com/nats-io/nats.go v1.40.1/go.mod h1:wV73x0FSI/orHPSYoyMeJB+KajMDoWyXmFaRrrYaaTo= github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0= @@ -14,6 +29,16 @@ github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.21.1 h1:DOvXXTqVzvkIewV/CDPFdejpMCGeMcbGCQ8YOmu+Ibk= +github.com/prometheus/client_golang v1.21.1/go.mod h1:U9NM32ykUErtVBxdvD3zfi+EuFkkaBvMb09mIfe0Zgg= +github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= +github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= +github.com/prometheus/common v0.62.0 h1:xasJaQlnWAeyHdUBeGjXmutelfJHWMRr+Fg4QszZ2Io= +github.com/prometheus/common v0.62.0/go.mod h1:vyBcEuLSvWos9B1+CyL7JZ2up+uFzXhkqml0W5zIY1I= +github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= +github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= @@ -22,7 +47,10 @@ golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk= +google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/grafana/dashboards/dashboard.json b/grafana/dashboards/dashboard.json new file mode 100644 index 0000000..e6ac98e --- /dev/null +++ b/grafana/dashboards/dashboard.json @@ -0,0 +1,615 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "datasource", + "uid": "grafana" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "description": "Dashboard for monitoring WebSocket metrics", + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "id": 1, + "links": [], + "panels": [ + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 6, + "x": 0, + "y": 0 + }, + "id": 1, + "options": { + "minVizHeight": 75, + "minVizWidth": 75, + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showThresholdLabels": false, + "showThresholdMarkers": true, + "sizing": "auto" + }, + "pluginVersion": "11.6.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "editorMode": "code", + "expr": "websocket_connections{}", + "interval": "", + "legendFormat": "{{endpoint}}", + "range": true, + "refId": "A" + } + ], + "title": "Active WebSocket Connections", + "type": "gauge" + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 6, + "x": 6, + "y": 0 + }, + "id": 3, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "percentChangeColorMode": "standard", + "reduceOptions": { + "calcs": [ + "sum" + ], + "fields": "", + "values": true + }, + "showPercentChange": false, + "showThresholdLabels": false, + "showThresholdMarkers": true, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "11.6.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "editorMode": "code", + "exemplar": false, + "expr": "websocket_messages_received_total{}", + "format": "time_series", + "instant": true, + "interval": "", + "legendFormat": "{{endpoint}}", + "range": false, + "refId": "A" + } + ], + "title": "Messages Received", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 6, + "x": 12, + "y": 0 + }, + "id": 4, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "percentChangeColorMode": "standard", + "reduceOptions": { + "calcs": [ + "sum" + ], + "fields": "", + "values": true + }, + "showPercentChange": false, + "showThresholdLabels": false, + "showThresholdMarkers": true, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "11.6.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "editorMode": "code", + "exemplar": false, + "expr": "websocket_messages_published_total{}", + "format": "time_series", + "instant": true, + "interval": "", + "legendFormat": "{{endpoint}}", + "range": false, + "refId": "A" + } + ], + "title": "Messages Published", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 6, + "x": 18, + "y": 0 + }, + "id": 5, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "percentChangeColorMode": "standard", + "reduceOptions": { + "calcs": [ + "sum" + ], + "fields": "", + "values": true + }, + "showPercentChange": false, + "showThresholdLabels": false, + "showThresholdMarkers": true, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "11.6.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "editorMode": "code", + "exemplar": false, + "expr": "websocket_publish_errors_total{}", + "format": "time_series", + "instant": true, + "interval": "", + "legendFormat": "{{endpoint}}", + "range": false, + "refId": "A" + } + ], + "title": "Publish Errors", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "smooth", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "always", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 8, + "x": 0, + "y": 8 + }, + "id": 2, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "11.6.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "editorMode": "code", + "exemplar": false, + "expr": "histogram_quantile(0.95, sum(rate(websocket_message_latency_seconds_bucket[1m])) by (le, endpoint))", + "format": "time_series", + "instant": false, + "interval": "", + "legendFormat": "{{endpoint}}", + "range": true, + "refId": "A" + } + ], + "title": "WebSocket Latency (95th Percentile)", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "smooth", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + }, + { + "color": "red", + "value": 0.1 + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 8, + "x": 8, + "y": 8 + }, + "id": 7, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "11.6.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "editorMode": "code", + "expr": "sum(websocket_message_latency_seconds_sum) by (endpoint) / sum(websocket_message_latency_seconds_count) by (endpoint)", + "format": "time_series", + "instant": false, + "interval": "", + "legendFormat": "{{endpoint}}", + "range": true, + "refId": "A" + } + ], + "title": "Real-Time WebSocket Latency by Endpoint", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + }, + { + "color": "orange", + "value": 0.05 + }, + { + "color": "red", + "value": 0.1 + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 8, + "x": 16, + "y": 8 + }, + "id": 6, + "options": { + "displayMode": "gradient", + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": false + }, + "maxVizHeight": 300, + "minVizHeight": 10, + "minVizWidth": 0, + "namePlacement": "auto", + "orientation": "horizontal", + "reduceOptions": { + "calcs": [ + "mean" + ], + "fields": "", + "values": false + }, + "showUnfilled": true, + "sizing": "auto", + "text": {}, + "valueMode": "color" + }, + "pluginVersion": "11.6.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "editorMode": "code", + "expr": "sum(websocket_message_latency_seconds_sum) by (endpoint) / sum(websocket_message_latency_seconds_count) by (endpoint)", + "instant": true, + "legendFormat": "{{endpoint}}", + "range": false, + "refId": "A" + } + ], + "title": "Average WebSocket Latency by Endpoint", + "type": "bargauge" + } + ], + "preload": false, + "refresh": "5s", + "schemaVersion": 41, + "tags": [], + "templating": { + "list": [] + }, + "time": { + "from": "now-1h", + "to": "now" + }, + "timepicker": {}, + "timezone": "", + "title": "Real Time Messaging Metrics Dashboard", + "uid": "realtime-metrics-dashboard", + "version": 7 +} \ No newline at end of file diff --git a/grafana/provisioning/dashboards/dashboard.yml b/grafana/provisioning/dashboards/dashboard.yml new file mode 100644 index 0000000..0bcf3d8 --- /dev/null +++ b/grafana/provisioning/dashboards/dashboard.yml @@ -0,0 +1,11 @@ +apiVersion: 1 + +providers: + - name: 'default' + orgId: 1 + folder: '' + type: file + disableDeletion: false + editable: true + options: + path: /var/lib/grafana/dashboards diff --git a/grafana/provisioning/datasources/datasource.yml b/grafana/provisioning/datasources/datasource.yml new file mode 100644 index 0000000..86fd346 --- /dev/null +++ b/grafana/provisioning/datasources/datasource.yml @@ -0,0 +1,8 @@ +apiVersion: 1 + +datasources: + - name: Prometheus + type: prometheus + access: proxy + url: http://prometheus:9090 + isDefault: true diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go new file mode 100644 index 0000000..a6683dc --- /dev/null +++ b/internal/metrics/metrics.go @@ -0,0 +1,85 @@ +package metrics + +import ( + "net/http" + "sync" + + "github.com/gorilla/websocket" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +var ( + register sync.Once + + websocketConnections = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "websocket_connections", + Help: "Number of active WebSocket connections", + }, + []string{"endpoint"}, + ) + + MessagesReceived = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "websocket_messages_received_total", + Help: "Total number of messages received via WebSocket", + }, + []string{"endpoint"}, + ) + + MessagesPublished = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "websocket_messages_published_total", + Help: "Total number of messages successfully published", + }, + []string{"endpoint"}, + ) + + PublishErrors = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "websocket_publish_errors_total", + Help: "Total number of publish errors", + }, + []string{"endpoint"}, + ) + + WebsocketLatency = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "websocket_message_latency_seconds", + Help: "Latency of WebSocket message processing", + Buckets: []float64{0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 2, 5, 10}, + }, + []string{"endpoint"}, + ) +) + +func InitMetrics() { + register.Do(func() { + prometheus.MustRegister(websocketConnections) + prometheus.MustRegister(MessagesReceived) + prometheus.MustRegister(MessagesPublished) + prometheus.MustRegister(PublishErrors) + prometheus.MustRegister(WebsocketLatency) + }) +} + +func PrometheusMiddleware(next http.HandlerFunc) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + endpoint := r.URL.Path + + isWebSocket := websocket.IsWebSocketUpgrade(r) + if isWebSocket { + websocketConnections.WithLabelValues(endpoint).Inc() + next.ServeHTTP(w, r) + websocketConnections.WithLabelValues(endpoint).Dec() + return + } + + next.ServeHTTP(w, r) + } +} + +func PromHandler() http.HandlerFunc { + return promhttp.Handler().ServeHTTP +} diff --git a/prometheus.yml b/prometheus.yml new file mode 100644 index 0000000..3b3519c --- /dev/null +++ b/prometheus.yml @@ -0,0 +1,13 @@ +global: + scrape_interval: 15s + +scrape_configs: + - job_name: "publisher_service" + metrics_path: /metrics + static_configs: + - targets: ["publisher:3000"] + + - job_name: "subscriber_service" + metrics_path: /metrics + static_configs: + - targets: ["subscriber:3001"] diff --git a/publisher/cmd/publisher/main.go b/publisher/cmd/publisher/main.go index 2b47054..98af4ff 100644 --- a/publisher/cmd/publisher/main.go +++ b/publisher/cmd/publisher/main.go @@ -5,6 +5,7 @@ import ( "net/http" "github.com/iulian509/realtime-messaging/config" + "github.com/iulian509/realtime-messaging/internal/metrics" "github.com/iulian509/realtime-messaging/publisher/internal/handlers" "github.com/iulian509/realtime-messaging/publisher/internal/mq" ) @@ -20,12 +21,18 @@ func main() { log.Fatalf("failed to connect to NATS server: %v", err) } + metrics.InitMetrics() + deps := &handlers.Dependencies{ PublisherClient: publisherClient, } - http.HandleFunc("/publish", deps.PublisherHandler) - log.Println("publisher service running on :3000") + http.HandleFunc("/metrics", metrics.PromHandler()) + http.HandleFunc("/publish", metrics.PrometheusMiddleware(deps.PublisherHandler)) + err = http.ListenAndServe(":3000", nil) - log.Fatalf("failed to start publisher service: %v", err) + log.Println("publisher service running on :3000") + if err != nil { + log.Fatalf("failed to start publisher service: %v", err) + } } diff --git a/publisher/internal/handlers/publisher.go b/publisher/internal/handlers/publisher.go index 1ed5d95..7a2b144 100644 --- a/publisher/internal/handlers/publisher.go +++ b/publisher/internal/handlers/publisher.go @@ -4,8 +4,10 @@ import ( "context" "log" "net/http" + "time" "github.com/gorilla/websocket" + "github.com/iulian509/realtime-messaging/internal/metrics" "github.com/iulian509/realtime-messaging/publisher/internal/mq" iw "github.com/iulian509/realtime-messaging/internal/websocket" @@ -34,6 +36,8 @@ func (deps *Dependencies) PublisherHandler(w http.ResponseWriter, r *http.Reques } func processMessages(ctx context.Context, cancel context.CancelFunc, conn *websocket.Conn, publisherClient *mq.Publisher) { + const endpoint = "/publish" + for { select { case <-ctx.Done(): @@ -50,13 +54,21 @@ func processMessages(ctx context.Context, cancel context.CancelFunc, conn *webso return } log.Printf("received message: %s", message) + metrics.MessagesReceived.WithLabelValues(endpoint).Inc() + + startTime := time.Now() err = publisherClient.PublishMessage(message) if err != nil { log.Printf("error publishing message: %v", err) + metrics.PublishErrors.WithLabelValues(endpoint).Inc() } else { log.Printf("published message: %s", message) + metrics.MessagesPublished.WithLabelValues(endpoint).Inc() } + + latency := time.Since(startTime).Seconds() + metrics.WebsocketLatency.WithLabelValues(endpoint).Observe(latency) } } } diff --git a/subscriber/cmd/subscriber/main.go b/subscriber/cmd/subscriber/main.go index 732907b..e004528 100644 --- a/subscriber/cmd/subscriber/main.go +++ b/subscriber/cmd/subscriber/main.go @@ -5,6 +5,7 @@ import ( "net/http" "github.com/iulian509/realtime-messaging/config" + "github.com/iulian509/realtime-messaging/internal/metrics" "github.com/iulian509/realtime-messaging/subscriber/internal/handlers" "github.com/iulian509/realtime-messaging/subscriber/internal/mq" ) @@ -20,12 +21,18 @@ func main() { log.Fatalf("failed to connect to NATS server: %v", err) } + metrics.InitMetrics() + deps := &handlers.Dependencies{ SubscriberClient: subscriberClient, } - http.HandleFunc("/subscribe", deps.SubscriberHandler) - log.Println("subscriber service running on :3001") + http.HandleFunc("/metrics", metrics.PromHandler()) + http.HandleFunc("/subscribe", metrics.PrometheusMiddleware(deps.SubscriberHandler)) + err = http.ListenAndServe(":3001", nil) - log.Fatalf("failed to start subscriber service: %v", err) + log.Println("subscriber service running on :3001") + if err != nil { + log.Fatalf("failed to start subscriber service: %v", err) + } } diff --git a/subscriber/internal/handlers/subscriber.go b/subscriber/internal/handlers/subscriber.go index b7b2508..45bb6bf 100644 --- a/subscriber/internal/handlers/subscriber.go +++ b/subscriber/internal/handlers/subscriber.go @@ -4,11 +4,13 @@ import ( "context" "log" "net/http" + "time" "github.com/gorilla/websocket" "github.com/iulian509/realtime-messaging/subscriber/internal/mq" "github.com/nats-io/nats.go" + "github.com/iulian509/realtime-messaging/internal/metrics" iw "github.com/iulian509/realtime-messaging/internal/websocket" ) @@ -36,16 +38,24 @@ func (deps *Dependencies) SubscriberHandler(w http.ResponseWriter, r *http.Reque func processMessages(ctx context.Context, cancel context.CancelFunc, conn *websocket.Conn, subscriberClient *mq.Subscriber) { const subject = "subject" + const endpoint = "/subscribe" subscription, err := subscriberClient.Subscribe(subject, func(msg *nats.Msg) { + startTime := time.Now() + log.Printf("received message on [%s]: %s", msg.Subject, string(msg.Data)) + metrics.MessagesReceived.WithLabelValues(endpoint).Inc() err := conn.WriteMessage(websocket.TextMessage, msg.Data) if err != nil { log.Printf("failed to send message to WebSocket: %v", err) + metrics.PublishErrors.WithLabelValues(endpoint).Inc() cancel() return } + metrics.MessagesPublished.WithLabelValues(endpoint).Inc() + latency := time.Since(startTime).Seconds() + metrics.WebsocketLatency.WithLabelValues(endpoint).Observe(latency) }) if err != nil { log.Printf("failed to subscribe to subject %q: %v", subject, err)