Skip to content

Commit 1a0643c

Browse files
authored
Fix prometheus port and add unit test for opentelemetry (#1093)
* Disable prometheus port when not allowed (#1078) and add unit test for opentelemetry (#1088)
1 parent 66402db commit 1a0643c

File tree

2 files changed

+266
-9
lines changed

2 files changed

+266
-9
lines changed

pkg/observability/opentelemetry.go

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ package observability
1616
import (
1717
"context"
1818
"fmt"
19+
"net"
1920
"net/http"
21+
"strconv"
2022
"time"
2123

2224
"github.com/prometheus/client_golang/prometheus/promhttp"
@@ -36,6 +38,7 @@ var (
3638
labelNodeStatusKey = attribute.Key("node/status")
3739
labelNodeNameKey = attribute.Key("node/name")
3840
labelEventIDKey = attribute.Key("node/event-id")
41+
metricsEndpoint = "/metrics"
3942
)
4043

4144
// Metrics represents the stats for observability
@@ -62,13 +65,14 @@ func InitMetrics(enabled bool, port int) (Metrics, error) {
6265

6366
// Starts an async process to collect golang runtime stats
6467
// go.opentelemetry.io/contrib/instrumentation/runtime
65-
if err = runtime.Start(
66-
runtime.WithMeterProvider(provider),
67-
runtime.WithMinimumReadMemStatsInterval(1*time.Second)); err != nil {
68+
err = runtime.Start(runtime.WithMeterProvider(provider), runtime.WithMinimumReadMemStatsInterval(1*time.Second))
69+
if err != nil {
6870
return Metrics{}, fmt.Errorf("failed to start Go runtime metrics collection: %w", err)
6971
}
7072

71-
go serveMetrics(port)
73+
if enabled {
74+
serveMetrics(port)
75+
}
7276

7377
return metrics, nil
7478
}
@@ -135,10 +139,19 @@ func registerMetricsWith(provider *metric.MeterProvider) (Metrics, error) {
135139
}, nil
136140
}
137141

138-
func serveMetrics(port int) {
139-
log.Info().Msgf("Starting to serve handler /metrics, port %d", port)
140-
http.Handle("/metrics", promhttp.Handler())
141-
if err := http.ListenAndServe(fmt.Sprintf(":%d", port), nil); err != nil {
142-
log.Err(err).Msg("Failed to listen and serve http server")
142+
func serveMetrics(port int) *http.Server {
143+
http.Handle(metricsEndpoint, promhttp.Handler())
144+
145+
server := &http.Server{
146+
Addr: net.JoinHostPort("", strconv.Itoa(port)),
143147
}
148+
149+
go func() {
150+
log.Info().Msgf("Starting to serve handler %s, port %d", metricsEndpoint, port)
151+
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
152+
log.Err(err).Msg("Failed to listen and serve http server")
153+
}
154+
}()
155+
156+
return server
144157
}
Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,244 @@
1+
// Copyright 2016-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"). You may
4+
// not use this file except in compliance with the License. A copy of the
5+
// License is located at
6+
//
7+
// http://aws.amazon.com/apache2.0/
8+
//
9+
// or in the "license" file accompanying this file. This file is distributed
10+
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
11+
// express or implied. See the License for the specific language governing
12+
// permissions and limitations under the License.
13+
14+
package observability
15+
16+
import (
17+
"context"
18+
"errors"
19+
"fmt"
20+
"net"
21+
"net/http"
22+
"net/http/httptest"
23+
"strconv"
24+
"strings"
25+
"testing"
26+
"time"
27+
28+
"github.com/prometheus/client_golang/prometheus/promhttp"
29+
"go.opentelemetry.io/otel/attribute"
30+
"go.opentelemetry.io/otel/exporters/prometheus"
31+
api "go.opentelemetry.io/otel/metric"
32+
"go.opentelemetry.io/otel/sdk/metric"
33+
34+
h "github.com/aws/aws-node-termination-handler/pkg/test"
35+
)
36+
37+
var (
38+
mockNth = "aws.node.termination.handler"
39+
mockErrorEvent = "mockErrorEvent"
40+
mockAction = "cordon-and-drain"
41+
mockNodeName1 = "nodeName1"
42+
mockNodeName2 = "nodeName2"
43+
mockNodeName3 = "nodeName3"
44+
mockEventID1 = "eventID1"
45+
mockEventID2 = "eventID2"
46+
mockEventID3 = "eventID3"
47+
successStatus = "success"
48+
errorStatus = "error"
49+
mockDefaultPort = 9092
50+
mockClosedPort = 9093
51+
)
52+
53+
func TestInitMetrics(t *testing.T) {
54+
getMetrics(t)
55+
56+
responseRecorder := mockMetricsRequest()
57+
58+
validateStatus(t, responseRecorder)
59+
60+
metricsMap := getMetricsMap(responseRecorder.Body.String())
61+
62+
runtimeMetrics := []string{
63+
"go_gc_gogc_percent",
64+
"go_memstats_frees_total",
65+
"go_goroutines",
66+
}
67+
68+
for _, metricName := range runtimeMetrics {
69+
_, exists := metricsMap[metricName]
70+
h.Assert(t, exists, fmt.Sprintf("%v metric should be present", metricName))
71+
}
72+
}
73+
74+
func TestErrorEventsInc(t *testing.T) {
75+
metrics := getMetrics(t)
76+
77+
metrics.ErrorEventsInc(mockErrorEvent)
78+
79+
responseRecorder := mockMetricsRequest()
80+
81+
validateStatus(t, responseRecorder)
82+
83+
metricsMap := getMetricsMap(responseRecorder.Body.String())
84+
85+
validateEventErrorTotal(t, metricsMap, 1)
86+
validateActionTotalV2(t, metricsMap, 0, successStatus)
87+
validateActionTotalV2(t, metricsMap, 0, errorStatus)
88+
}
89+
90+
func TestNodeActionsInc(t *testing.T) {
91+
metrics := getMetrics(t)
92+
93+
metrics.NodeActionsInc(mockAction, mockNodeName1, mockEventID1, nil)
94+
metrics.NodeActionsInc(mockAction, mockNodeName2, mockEventID2, nil)
95+
metrics.NodeActionsInc(mockAction, mockNodeName3, mockEventID3, errors.New("mockError"))
96+
97+
responseRecorder := mockMetricsRequest()
98+
99+
validateStatus(t, responseRecorder)
100+
101+
metricsMap := getMetricsMap(responseRecorder.Body.String())
102+
103+
validateEventErrorTotal(t, metricsMap, 0)
104+
validateActionTotalV2(t, metricsMap, 2, successStatus)
105+
validateActionTotalV2(t, metricsMap, 1, errorStatus)
106+
}
107+
108+
func TestRegisterMetricsWith(t *testing.T) {
109+
const errorEventMetricsTotal = 23
110+
const successActionMetricsTotal = 31
111+
const errorActionMetricsTotal = 97
112+
113+
metrics := getMetrics(t)
114+
115+
errorEventlabels := []attribute.KeyValue{labelEventErrorWhereKey.String(mockErrorEvent)}
116+
successActionlabels := []attribute.KeyValue{labelNodeActionKey.String(mockAction), labelNodeStatusKey.String(successStatus)}
117+
errorActionlabels := []attribute.KeyValue{labelNodeActionKey.String(mockAction), labelNodeStatusKey.String(errorStatus)}
118+
119+
for i := 0; i < errorEventMetricsTotal; i++ {
120+
metrics.errorEventsCounter.Add(context.Background(), 1, api.WithAttributes(errorEventlabels...))
121+
}
122+
for i := 0; i < successActionMetricsTotal; i++ {
123+
metrics.actionsCounterV2.Add(context.Background(), 1, api.WithAttributes(successActionlabels...))
124+
}
125+
for i := 0; i < errorActionMetricsTotal; i++ {
126+
metrics.actionsCounterV2.Add(context.Background(), 1, api.WithAttributes(errorActionlabels...))
127+
}
128+
129+
responseRecorder := mockMetricsRequest()
130+
131+
validateStatus(t, responseRecorder)
132+
133+
metricsMap := getMetricsMap(responseRecorder.Body.String())
134+
135+
validateEventErrorTotal(t, metricsMap, errorEventMetricsTotal)
136+
validateActionTotalV2(t, metricsMap, successActionMetricsTotal, successStatus)
137+
validateActionTotalV2(t, metricsMap, errorActionMetricsTotal, errorStatus)
138+
}
139+
140+
func TestServeMetrics(t *testing.T) {
141+
server := serveMetrics(mockDefaultPort)
142+
143+
defer func() {
144+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
145+
defer cancel()
146+
if err := server.Shutdown(ctx); err != nil {
147+
t.Errorf("failed to shutdown server: %v", err)
148+
}
149+
}()
150+
151+
time.Sleep(100 * time.Millisecond)
152+
153+
conn, err := net.DialTimeout("tcp", fmt.Sprintf("localhost:%d", mockDefaultPort), time.Second)
154+
if err != nil {
155+
t.Errorf("server is not listening on port %d: %v", mockDefaultPort, err)
156+
}
157+
conn.Close()
158+
159+
conn, err = net.DialTimeout("tcp", fmt.Sprintf("localhost:%d", mockClosedPort), time.Second)
160+
if err == nil {
161+
conn.Close()
162+
t.Errorf("server should not be listening on port %d: %v", mockClosedPort, err)
163+
}
164+
}
165+
166+
func getMetrics(t *testing.T) *Metrics {
167+
exporter, err := prometheus.New()
168+
if err != nil {
169+
t.Errorf("failed to create Prometheus exporter: %v", err)
170+
}
171+
provider := metric.NewMeterProvider(metric.WithReader(exporter))
172+
metrics, err := registerMetricsWith(provider)
173+
if err != nil {
174+
t.Errorf("failed to register metrics with Prometheus provider: %v", err)
175+
}
176+
metrics.enabled = true
177+
178+
t.Cleanup(func() {
179+
if provider != nil {
180+
if err := provider.Shutdown(context.Background()); err != nil {
181+
t.Errorf("failed to shutdown provider: %v", err)
182+
}
183+
}
184+
})
185+
186+
return &metrics
187+
}
188+
189+
func mockMetricsRequest() *httptest.ResponseRecorder {
190+
handler := promhttp.Handler()
191+
req := httptest.NewRequest("GET", metricsEndpoint, nil)
192+
responseRecorder := httptest.NewRecorder()
193+
handler.ServeHTTP(responseRecorder, req)
194+
return responseRecorder
195+
}
196+
197+
func validateStatus(t *testing.T, responseRecorder *httptest.ResponseRecorder) {
198+
status := responseRecorder.Code
199+
h.Equals(t, http.StatusOK, status)
200+
}
201+
202+
// This method take response body got from Prometheus exporter as arg
203+
// Example:
204+
// # HELP go_goroutines Number of goroutines that currently exist.
205+
// # TYPE go_goroutines gauge
206+
// go_goroutines 6
207+
func getMetricsMap(body string) map[string]string {
208+
metricsMap := make(map[string]string)
209+
lines := strings.Split(body, "\n")
210+
for _, line := range lines {
211+
if len(strings.TrimSpace(line)) == 0 {
212+
continue
213+
}
214+
if strings.HasPrefix(strings.TrimSpace(line), "# ") {
215+
continue
216+
}
217+
parts := strings.SplitN(line, " ", 2)
218+
if len(parts) != 2 {
219+
continue
220+
}
221+
key := parts[0]
222+
value := parts[1]
223+
metricsMap[key] = value
224+
}
225+
return metricsMap
226+
}
227+
228+
func validateEventErrorTotal(t *testing.T, metricsMap map[string]string, expectedTotal int) {
229+
eventErrorTotalKey := fmt.Sprintf("events_error_total{event_error_where=\"%v\",otel_scope_name=\"%v\",otel_scope_version=\"\"}", mockErrorEvent, mockNth)
230+
actualValue, exists := metricsMap[eventErrorTotalKey]
231+
if !exists {
232+
actualValue = "0"
233+
}
234+
h.Equals(t, strconv.Itoa(expectedTotal), actualValue)
235+
}
236+
237+
func validateActionTotalV2(t *testing.T, metricsMap map[string]string, expectedTotal int, nodeStatus string) {
238+
actionTotalKey := fmt.Sprintf("actions_total{node_action=\"%v\",node_status=\"%v\",otel_scope_name=\"%v\",otel_scope_version=\"\"}", mockAction, nodeStatus, mockNth)
239+
actualValue, exists := metricsMap[actionTotalKey]
240+
if !exists {
241+
actualValue = "0"
242+
}
243+
h.Equals(t, strconv.Itoa(expectedTotal), actualValue)
244+
}

0 commit comments

Comments
 (0)