Skip to content

Commit bd614af

Browse files
authored
feat: Add initial Flow Control metrics (#1714)
* feat: Add initial Flow Control metrics Introduces initial Prometheus metrics for the experimental Flow Contorl layer in EPP. This change adds the following metrics: - inference_extension_flow_control_request_queue_duration_seconds: A histogram to track the total time requests spend in the Flow Control layer, from invocation of EnqueueAndWait to final outcome. - inference_extension_flow_control_queue_size: A gauge to track the number of requests currently being managed by the Flow Control layer. These metrics are labeled by fairness_id, priority, and outcome (for the duration metric). * Rebase onto HEAd and resolve conflicts.
1 parent b377d7c commit bd614af

File tree

5 files changed

+184
-11
lines changed

5 files changed

+184
-11
lines changed

pkg/epp/flowcontrol/controller/controller.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"errors"
2929
"fmt"
3030
"slices"
31+
"strconv"
3132
"sync"
3233
"time"
3334

@@ -38,6 +39,7 @@ import (
3839
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts"
3940
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/controller/internal"
4041
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
42+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
4143
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
4244
)
4345

@@ -209,14 +211,19 @@ func (fc *FlowController) EnqueueAndWait(
209211
return types.QueueOutcomeRejectedOther, errors.New("request cannot be nil")
210212
}
211213

214+
flowKey := req.FlowKey()
215+
fairnessID := flowKey.ID
216+
priority := strconv.Itoa(flowKey.Priority)
217+
metrics.IncFlowControlQueueSize(fairnessID, priority)
218+
defer metrics.DecFlowControlQueueSize(fairnessID, priority)
219+
212220
// 1. Create the derived context that governs this request's lifecycle (Parent Cancellation + TTL).
213221
reqCtx, cancel, enqueueTime := fc.createRequestContext(ctx, req)
214222
defer cancel()
215223

216224
// 2. Enter the distribution loop to find a home for the request.
217225
// This loop is responsible for retrying on ErrShardDraining.
218226
for {
219-
220227
select { // Non-blocking check on controller lifecycle.
221228
case <-fc.parentCtx.Done():
222229
return types.QueueOutcomeRejectedOther, fmt.Errorf("%w: %w", types.ErrRejected, types.ErrFlowControllerNotRunning)

pkg/epp/flowcontrol/controller/internal/item.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@ import (
2020
"context"
2121
"errors"
2222
"fmt"
23+
"strconv"
2324
"sync"
2425
"sync/atomic"
2526
"time"
2627

2728
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
29+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
2830
)
2931

3032
// FinalState encapsulates the terminal outcome of a FlowItem's lifecycle.
@@ -154,6 +156,10 @@ func (fi *FlowItem) finalizeInternal(outcome types.QueueOutcome, err error) {
154156
// Atomically store the pointer. This is the critical memory barrier that publishes the state safely.
155157
fi.finalState.Store(finalState)
156158

159+
duration := time.Since(fi.enqueueTime)
160+
flowKey := fi.originalRequest.FlowKey()
161+
metrics.RecordFlowControlRequestQueueDuration(flowKey.ID, strconv.Itoa(flowKey.Priority), outcome.String(), duration)
162+
157163
fi.done <- finalState
158164
close(fi.done)
159165
}

pkg/epp/metrics/metrics.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,28 @@ var (
235235
},
236236
[]string{"commit", "build_ref"},
237237
)
238+
239+
// Flow Control Metrics
240+
flowControlRequestQueueDuration = prometheus.NewHistogramVec(
241+
prometheus.HistogramOpts{
242+
Subsystem: InferenceExtension,
243+
Name: "flow_control_request_queue_duration_seconds",
244+
Help: metricsutil.HelpMsgWithStability("Distribution of the total time requests spend in the EPP flow control layer, measured from the start of the EnqueueAndWait call until a final outcome is reached.", compbasemetrics.ALPHA),
245+
Buckets: []float64{
246+
0.0001, 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0,
247+
},
248+
},
249+
[]string{"fairness_id", "priority", "outcome"},
250+
)
251+
252+
flowControlQueueSize = prometheus.NewGaugeVec(
253+
prometheus.GaugeOpts{
254+
Subsystem: InferenceExtension,
255+
Name: "flow_control_queue_size",
256+
Help: metricsutil.HelpMsgWithStability("Current number of requests being actively managed by the EPP flow control layer, from the start of the EnqueueAndWait call until a final outcome is reached.", compbasemetrics.ALPHA),
257+
},
258+
[]string{"fairness_id", "priority"},
259+
)
238260
)
239261

240262
var registerMetrics sync.Once
@@ -260,6 +282,8 @@ func Register(customCollectors ...prometheus.Collector) {
260282
metrics.Registry.MustRegister(PrefixCacheSize)
261283
metrics.Registry.MustRegister(PrefixCacheHitRatio)
262284
metrics.Registry.MustRegister(PrefixCacheHitLength)
285+
metrics.Registry.MustRegister(flowControlRequestQueueDuration)
286+
metrics.Registry.MustRegister(flowControlQueueSize)
263287
for _, collector := range customCollectors {
264288
metrics.Registry.MustRegister(collector)
265289
}
@@ -286,6 +310,8 @@ func Reset() {
286310
PrefixCacheSize.Reset()
287311
PrefixCacheHitRatio.Reset()
288312
PrefixCacheHitLength.Reset()
313+
flowControlRequestQueueDuration.Reset()
314+
flowControlQueueSize.Reset()
289315
}
290316

291317
// RecordRequstCounter records the number of requests.
@@ -414,3 +440,18 @@ func RecordPrefixCacheMatch(matchedLength, totalLength int) {
414440
func RecordInferenceExtensionInfo(commitSha, buildRef string) {
415441
InferenceExtensionInfo.WithLabelValues(commitSha, buildRef).Set(1)
416442
}
443+
444+
// RecordFlowControlRequestQueueDuration records the duration a request spent in the Flow Control layer.
445+
func RecordFlowControlRequestQueueDuration(fairnessID, priority, outcome string, duration time.Duration) {
446+
flowControlRequestQueueDuration.WithLabelValues(fairnessID, priority, outcome).Observe(duration.Seconds())
447+
}
448+
449+
// IncFlowControlQueueSize increments the Flow Control queue size gauge.
450+
func IncFlowControlQueueSize(fairnessID, priority string) {
451+
flowControlQueueSize.WithLabelValues(fairnessID, priority).Inc()
452+
}
453+
454+
// DecFlowControlQueueSize decrements the Flow Control queue size gauge.
455+
func DecFlowControlQueueSize(fairnessID, priority string) {
456+
flowControlQueueSize.WithLabelValues(fairnessID, priority).Dec()
457+
}

0 commit comments

Comments
 (0)