Skip to content

Commit 262b3de

Browse files
committed
add grpc subscribers metric.
Signed-off-by: morvencao <[email protected]> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED
1 parent a229e25 commit 262b3de

File tree

3 files changed

+84
-37
lines changed

3 files changed

+84
-37
lines changed

pkg/cloudevents/server/grpc/broker.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7-
"open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc/heartbeat"
87
"sync"
98
"time"
109

10+
"open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc/heartbeat"
11+
"open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc/metrics"
12+
1113
"k8s.io/apimachinery/pkg/api/errors"
1214

1315
cloudevents "github.com/cloudevents/sdk-go/v2"
@@ -134,6 +136,7 @@ func (bkr *GRPCBroker) register(
134136
}
135137

136138
klog.V(4).Infof("register a subscriber %s (cluster name = %s)", id, clusterName)
139+
metrics.IncGRPCCESubscribersMetric(clusterName, dataType.String())
137140

138141
return id, errChan
139142
}
@@ -147,6 +150,7 @@ func (bkr *GRPCBroker) unregister(id string) {
147150
if sub, exists := bkr.subscribers[id]; exists {
148151
close(sub.errChan)
149152
delete(bkr.subscribers, id)
153+
metrics.DecGRPCCESubscribersMetric(sub.clusterName, sub.dataType.String())
150154
}
151155
}
152156

pkg/cloudevents/server/grpc/metrics/metrics.go

Lines changed: 69 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"google.golang.org/grpc/status"
1010

1111
"github.com/cloudevents/sdk-go/v2/binding"
12+
cetypes "github.com/cloudevents/sdk-go/v2/types"
1213
pbv1 "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protobuf/v1"
1314
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protocol"
1415
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
@@ -21,37 +22,51 @@ const grpcCEMetricsSubsystem = "grpc_server_ce"
2122

2223
// Names of the labels added to metrics:
2324
const (
25+
grpcCEMetricsClusterLabel = "consumer"
2426
grpcCEMetricsDataTypeLabel = "data_type"
2527
grpcCEMetricsMethodLabel = "method"
2628
grpcMetricsCodeLabel = "grpc_code"
2729
)
2830

29-
// grpcCEMetricsLabels - Array of labels added to grpc server metrics for cloudevents:
30-
var grpcCEMetricsLabels = []string{
31+
// grpcCEMetricsCommonLabels - Array of common labels added to grpc server metrics for cloudevents:
32+
var grpcCEMetricsCommonLabels = []string{
33+
grpcCEMetricsClusterLabel,
3134
grpcCEMetricsDataTypeLabel,
32-
grpcCEMetricsMethodLabel,
3335
}
3436

37+
// grpcCEMetricsHandlerLabels - Array of handler labels added to grpc server metrics for cloudevents:
38+
var grpcCEMetricsHandlerLabels = append(grpcCEMetricsCommonLabels, grpcCEMetricsMethodLabel)
39+
3540
// grpcCEMetricsAllLabels - Array of all labels added to grpc server metrics for cloudevents:
36-
var grpcCEMetricsAllLabels = append(grpcCEMetricsLabels, grpcMetricsCodeLabel)
41+
var grpcCEMetricsAllLabels = append(grpcCEMetricsHandlerLabels, grpcMetricsCodeLabel)
3742

3843
// Names of the grpc server metrics for cloudevents:
3944
const (
45+
subscribersMetric = "subscribers"
4046
calledCountMetric = "called_total"
4147
processedCountMetric = "processed_total"
4248
processingDurationMetric = "processing_duration_seconds"
4349
messageReceivedCountMetric = "msg_received_total"
4450
messageSentCountMetric = "msg_sent_total"
4551
)
4652

53+
// grpcCESubscribersMetric is a gauge metric that tracks the number of registered
54+
// subscribers for cloudevents on the gRPC server.
55+
var grpcCESubscribersMetric = k8smetrics.NewGaugeVec(&k8smetrics.GaugeOpts{
56+
Subsystem: grpcCEMetricsSubsystem,
57+
Name: subscribersMetric,
58+
StabilityLevel: k8smetrics.ALPHA,
59+
Help: "Number of registered subscribers for cloudevents on the grpc server.",
60+
}, grpcCEMetricsCommonLabels)
61+
4762
// grpcCECalledCountMetric is a counter metric that tracks the total number of
4863
// RPC requests for cloudevents called on the gRPC server.
4964
var grpcCECalledCountMetric = k8smetrics.NewCounterVec(&k8smetrics.CounterOpts{
5065
Subsystem: grpcCEMetricsSubsystem,
5166
Name: calledCountMetric,
5267
StabilityLevel: k8smetrics.ALPHA,
5368
Help: "Total number of RPC requests for cloudevents called on the grpc server.",
54-
}, grpcCEMetricsLabels)
69+
}, grpcCEMetricsHandlerLabels)
5570

5671
// grpcCEMessageReceivedCountMetric is a counter metric that tracks the total number of
5772
// messages for cloudevents received on the gRPC server.
@@ -60,7 +75,7 @@ var grpcCEMessageReceivedCountMetric = k8smetrics.NewCounterVec(&k8smetrics.Coun
6075
Name: messageReceivedCountMetric,
6176
StabilityLevel: k8smetrics.ALPHA,
6277
Help: "Total number of messages for cloudevents received on the gRPC server.",
63-
}, grpcCEMetricsLabels)
78+
}, grpcCEMetricsHandlerLabels)
6479

6580
// grpcCEMessageSentCountMetric is a counter metric that tracks the total number of
6681
// messages for cloudevents sent by the gRPC server.
@@ -69,7 +84,7 @@ var grpcCEMessageSentCountMetric = k8smetrics.NewCounterVec(&k8smetrics.CounterO
6984
Name: messageSentCountMetric,
7085
StabilityLevel: k8smetrics.ALPHA,
7186
Help: "Total number of messages for cloudevents sent by the gRPC server.",
72-
}, grpcCEMetricsLabels)
87+
}, grpcCEMetricsHandlerLabels)
7388

7489
// grpcCEProcessedCountMetric is a counter metric that tracks the total number of
7590
// RPC requests for cloudevents processed on the server, regardless of success or failure.
@@ -106,61 +121,72 @@ func NewCloudEventsMetricsUnaryInterceptor() grpc.UnaryServerInterceptor {
106121
}
107122

108123
// initialize defaults for error cases
124+
cluster := "unknown"
109125
dataType := "unknown"
110126

111127
pubReq, ok := req.(*pbv1.PublishRequest)
112128
if !ok {
113129
err := fmt.Errorf("invalid request type for Publish method")
114-
recordCloudEventsMetrics(dataType, method, err, startTime)
130+
recordCloudEventsMetrics(cluster, dataType, method, err, startTime)
115131
return nil, err
116132
}
117133
// convert the request to cloudevent and extract the source
118134
evt, err := binding.ToEvent(ctx, protocol.NewMessage(pubReq.Event))
119135
if err != nil {
120136
err = fmt.Errorf("failed to convert to cloudevent: %v", err)
121-
recordCloudEventsMetrics(dataType, method, err, startTime)
137+
recordCloudEventsMetrics(cluster, dataType, method, err, startTime)
138+
return nil, err
139+
}
140+
141+
// extract the cluster name from event extensions
142+
clusterVal, err := cetypes.ToString(evt.Context.GetExtensions()[types.ExtensionClusterName])
143+
if err != nil {
144+
err = fmt.Errorf("failed to get clustername extension: %v", err)
145+
recordCloudEventsMetrics(cluster, dataType, method, err, startTime)
122146
return nil, err
123147
}
148+
cluster = clusterVal
124149

125150
// extract the data type from event type
126151
eventType, err := types.ParseCloudEventsType(evt.Type())
127152
if err != nil {
128153
err = fmt.Errorf("failed to parse cloud event type %s, %v", evt.Type(), err)
129-
recordCloudEventsMetrics(dataType, method, err, startTime)
154+
recordCloudEventsMetrics(cluster, dataType, method, err, startTime)
130155
return nil, err
131156
}
132157
dataType = eventType.CloudEventsDataType.String()
133158

134-
grpcCECalledCountMetric.WithLabelValues(dataType, method).Inc()
135-
grpcCEMessageReceivedCountMetric.WithLabelValues(dataType, method).Inc()
159+
grpcCECalledCountMetric.WithLabelValues(cluster, dataType, method).Inc()
160+
grpcCEMessageReceivedCountMetric.WithLabelValues(cluster, dataType, method).Inc()
136161
// call rpc handler to handle RPC request
137162
resp, err := handler(ctx, req)
138163
duration := time.Since(startTime).Seconds()
139-
grpcCEMessageSentCountMetric.WithLabelValues(dataType, method).Inc()
164+
grpcCEMessageSentCountMetric.WithLabelValues(cluster, dataType, method).Inc()
140165

141166
// get status code from error
142167
status := statusFromError(err)
143168
code := status.Code()
144-
grpcCEProcessedCountMetric.WithLabelValues(dataType, method, code.String()).Inc()
145-
grpcCEProcessingDurationMetric.WithLabelValues(dataType, method, code.String()).Observe(duration)
169+
grpcCEProcessedCountMetric.WithLabelValues(cluster, dataType, method, code.String()).Inc()
170+
grpcCEProcessingDurationMetric.WithLabelValues(cluster, dataType, method, code.String()).Observe(duration)
146171

147172
return resp, err
148173
}
149174
}
150175

151-
func recordCloudEventsMetrics(dataType, method string, err error, startTime time.Time) {
176+
func recordCloudEventsMetrics(cluster, dataType, method string, err error, startTime time.Time) {
152177
duration := time.Since(startTime).Seconds()
153178
status := statusFromError(err)
154179
code := status.Code()
155-
grpcCEProcessedCountMetric.WithLabelValues(dataType, method, code.String()).Inc()
156-
grpcCEProcessingDurationMetric.WithLabelValues(dataType, method, code.String()).Observe(duration)
180+
grpcCEProcessedCountMetric.WithLabelValues(cluster, dataType, method, code.String()).Inc()
181+
grpcCEProcessingDurationMetric.WithLabelValues(cluster, dataType, method, code.String()).Observe(duration)
157182
}
158183

159184
// wrappedCloudEventsMetricsStream wraps a grpc.ServerStream, capturing the request source
160185
// emitting metrics for the stream interceptor.
161186
type wrappedCloudEventsMetricsStream struct {
162-
dataType *string
163-
method string
187+
clusterName *string
188+
dataType *string
189+
method string
164190
grpc.ServerStream
165191
ctx context.Context
166192
}
@@ -178,10 +204,11 @@ func (w *wrappedCloudEventsMetricsStream) RecvMsg(m interface{}) error {
178204
return fmt.Errorf("invalid request type for Subscribe method")
179205
}
180206

181-
if w.dataType != nil {
207+
if w.clusterName != nil && w.dataType != nil {
208+
*w.clusterName = subReq.ClusterName
182209
*w.dataType = subReq.DataType
183-
grpcCECalledCountMetric.WithLabelValues(*w.dataType, w.method).Inc()
184-
grpcCEMessageReceivedCountMetric.WithLabelValues(*w.dataType, w.method).Inc()
210+
grpcCECalledCountMetric.WithLabelValues(*w.clusterName, *w.dataType, w.method).Inc()
211+
grpcCEMessageReceivedCountMetric.WithLabelValues(*w.clusterName, *w.dataType, w.method).Inc()
185212
}
186213

187214
return nil
@@ -194,16 +221,16 @@ func (w *wrappedCloudEventsMetricsStream) SendMsg(m interface{}) error {
194221
return err
195222
}
196223

197-
if w.dataType != nil && *w.dataType != "" {
198-
grpcCEMessageSentCountMetric.WithLabelValues(*w.dataType, w.method).Inc()
224+
if w.clusterName != nil && w.dataType != nil && *w.clusterName != "" && *w.dataType != "" {
225+
grpcCEMessageSentCountMetric.WithLabelValues(*w.clusterName, *w.dataType, w.method).Inc()
199226
}
200227

201228
return nil
202229
}
203230

204-
// newWrappedCloudEventsMetricsStream creates a wrappedCloudEventsMetricsStream with the specified type reference.
205-
func newWrappedCloudEventsMetricsStream(dataType *string, method string, ctx context.Context, ss grpc.ServerStream) grpc.ServerStream {
206-
return &wrappedCloudEventsMetricsStream{dataType, method, ss, ctx}
231+
// newWrappedCloudEventsMetricsStream creates a wrappedCloudEventsMetricsStream with the specified type and cluster reference.
232+
func newWrappedCloudEventsMetricsStream(clusterName, dataType *string, method string, ctx context.Context, ss grpc.ServerStream) grpc.ServerStream {
233+
return &wrappedCloudEventsMetricsStream{clusterName, dataType, method, ss, ctx}
207234
}
208235

209236
// NewCloudEventsMetricsStreamInterceptor creates a stream server interceptor for server metrics.
@@ -217,15 +244,16 @@ func NewCloudEventsMetricsStreamInterceptor() grpc.StreamServerInterceptor {
217244
}
218245

219246
dataType := ""
247+
cluster := ""
220248
// create a wrapped stream to capture the source and emit metrics
221-
wrappedCEMetricsStream := newWrappedCloudEventsMetricsStream(&dataType, method, stream.Context(), stream)
249+
wrappedCEMetricsStream := newWrappedCloudEventsMetricsStream(&cluster, &dataType, method, stream.Context(), stream)
222250
// call rpc handler to handle RPC request
223251
err := handler(srv, wrappedCEMetricsStream)
224252

225253
// get status code from error
226254
status := statusFromError(err)
227255
code := status.Code()
228-
grpcCEProcessedCountMetric.WithLabelValues(dataType, method, code.String()).Inc()
256+
grpcCEProcessedCountMetric.WithLabelValues(cluster, dataType, method, code.String()).Inc()
229257

230258
return err
231259
}
@@ -264,10 +292,21 @@ func SplitMethod(fullMethod string) (service, method string) {
264292
// Register all the grpc server metrics for cloudevents.
265293
func CloudEventsGRPCMetrics() []k8smetrics.Registerable {
266294
return []k8smetrics.Registerable{
295+
grpcCESubscribersMetric,
267296
grpcCECalledCountMetric,
268297
grpcCEProcessedCountMetric,
269298
grpcCEProcessingDurationMetric,
270299
grpcCEMessageReceivedCountMetric,
271300
grpcCEMessageSentCountMetric,
272301
}
273302
}
303+
304+
// IncGRPCCESubscribersMetric increments the grpcCESubscribersMetric by 1 for the given cluster and dataType.
305+
func IncGRPCCESubscribersMetric(cluster, dataType string) {
306+
grpcCESubscribersMetric.WithLabelValues(cluster, dataType).Inc()
307+
}
308+
309+
// DecGRPCCESubscribersMetric decrements the grpcCESubscribersMetric by 1 for the given cluster and dataType.
310+
func DecGRPCCESubscribersMetric(cluster, dataType string) {
311+
grpcCESubscribersMetric.WithLabelValues(cluster, dataType).Dec()
312+
}

pkg/server/grpc/metrics/metrics_test.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ func TestGRPCMetricsInterceptor(t *testing.T) {
105105
"grpc_server_msg_received_bytes_total",
106106
"grpc_server_msg_sent_bytes_total",
107107
"grpc_server_handled_total",
108+
"grpc_server_ce_subscribers",
108109
"grpc_server_ce_called_total",
109110
"grpc_server_ce_msg_received_total",
110111
"grpc_server_ce_msg_sent_total",
@@ -124,20 +125,23 @@ func TestGRPCMetricsInterceptor(t *testing.T) {
124125
var expectedMetrics = `# HELP grpc_server_active_connections [ALPHA] Current number of active gRPC server connections.
125126
# TYPE grpc_server_active_connections gauge
126127
grpc_server_active_connections{local_addr="bufconn",remote_addr="bufconn"} 1
128+
# HELP grpc_server_ce_subscribers [ALPHA] Number of registered subscribers for cloudevents on the grpc server.
129+
# TYPE grpc_server_ce_subscribers gauge
130+
grpc_server_ce_subscribers{consumer="cluster1",data_type="io.open-cluster-management.works.v1alpha1.manifestbundles"} 1
127131
# HELP grpc_server_ce_called_total [ALPHA] Total number of RPC requests for cloudevents called on the grpc server.
128132
# TYPE grpc_server_ce_called_total counter
129-
grpc_server_ce_called_total{data_type="io.open-cluster-management.works.v1alpha1.manifestbundles",method="Publish"} 1
130-
grpc_server_ce_called_total{data_type="io.open-cluster-management.works.v1alpha1.manifestbundles",method="Subscribe"} 1
133+
grpc_server_ce_called_total{consumer="cluster1",data_type="io.open-cluster-management.works.v1alpha1.manifestbundles",method="Publish"} 1
134+
grpc_server_ce_called_total{consumer="cluster1",data_type="io.open-cluster-management.works.v1alpha1.manifestbundles",method="Subscribe"} 1
131135
# HELP grpc_server_ce_msg_received_total [ALPHA] Total number of messages for cloudevents received on the gRPC server.
132136
# TYPE grpc_server_ce_msg_received_total counter
133-
grpc_server_ce_msg_received_total{data_type="io.open-cluster-management.works.v1alpha1.manifestbundles",method="Publish"} 1
134-
grpc_server_ce_msg_received_total{data_type="io.open-cluster-management.works.v1alpha1.manifestbundles",method="Subscribe"} 1
137+
grpc_server_ce_msg_received_total{consumer="cluster1",data_type="io.open-cluster-management.works.v1alpha1.manifestbundles",method="Publish"} 1
138+
grpc_server_ce_msg_received_total{consumer="cluster1",data_type="io.open-cluster-management.works.v1alpha1.manifestbundles",method="Subscribe"} 1
135139
# HELP grpc_server_ce_msg_sent_total [ALPHA] Total number of messages for cloudevents sent by the gRPC server.
136140
# TYPE grpc_server_ce_msg_sent_total counter
137-
grpc_server_ce_msg_sent_total{data_type="io.open-cluster-management.works.v1alpha1.manifestbundles",method="Publish"} 1
141+
grpc_server_ce_msg_sent_total{consumer="cluster1",data_type="io.open-cluster-management.works.v1alpha1.manifestbundles",method="Publish"} 1
138142
# HELP grpc_server_ce_processed_total [ALPHA] Total number of RPC requests for cloudevents processed on the server, regardless of success or failure.
139143
# TYPE grpc_server_ce_processed_total counter
140-
grpc_server_ce_processed_total{data_type="io.open-cluster-management.works.v1alpha1.manifestbundles",grpc_code="OK",method="Publish"} 1
144+
grpc_server_ce_processed_total{consumer="cluster1",data_type="io.open-cluster-management.works.v1alpha1.manifestbundles",grpc_code="OK",method="Publish"} 1
141145
# HELP grpc_server_msg_received_bytes_total [ALPHA] Total number of bytes received on the gRPC server.
142146
# TYPE grpc_server_msg_received_bytes_total counter
143147
grpc_server_msg_received_bytes_total{grpc_method="Publish",grpc_service="io.cloudevents.v1.CloudEventService",grpc_type="unary"} 2286

0 commit comments

Comments
 (0)