Skip to content

Commit c71339b

Browse files
LukeAVanDrieBenjaminBraunDev
authored andcommitted
feat: Add initial Flow Control metrics (kubernetes-sigs#1714)
* feat: Add initial Flow Control metrics Introduces initial Prometheus metrics for the experimental Flow Contorl layer in EPP. This change adds the following metrics: - inference_extension_flow_control_request_queue_duration_seconds: A histogram to track the total time requests spend in the Flow Control layer, from invocation of EnqueueAndWait to final outcome. - inference_extension_flow_control_queue_size: A gauge to track the number of requests currently being managed by the Flow Control layer. These metrics are labeled by fairness_id, priority, and outcome (for the duration metric). * Rebase onto HEAd and resolve conflicts.
1 parent c91e716 commit c71339b

File tree

5 files changed

+184
-12
lines changed

5 files changed

+184
-12
lines changed

pkg/epp/flowcontrol/controller/controller.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"errors"
2929
"fmt"
3030
"slices"
31+
"strconv"
3132
"sync"
3233
"time"
3334

@@ -38,6 +39,7 @@ import (
3839
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts"
3940
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/controller/internal"
4041
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
42+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
4143
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
4244
)
4345

@@ -209,14 +211,19 @@ func (fc *FlowController) EnqueueAndWait(
209211
return types.QueueOutcomeRejectedOther, errors.New("request cannot be nil")
210212
}
211213

214+
flowKey := req.FlowKey()
215+
fairnessID := flowKey.ID
216+
priority := strconv.Itoa(flowKey.Priority)
217+
metrics.IncFlowControlQueueSize(fairnessID, priority)
218+
defer metrics.DecFlowControlQueueSize(fairnessID, priority)
219+
212220
// 1. Create the derived context that governs this request's lifecycle (Parent Cancellation + TTL).
213221
reqCtx, cancel, enqueueTime := fc.createRequestContext(ctx, req)
214222
defer cancel()
215223

216224
// 2. Enter the distribution loop to find a home for the request.
217225
// This loop is responsible for retrying on ErrShardDraining.
218226
for {
219-
220227
select { // Non-blocking check on controller lifecycle.
221228
case <-fc.parentCtx.Done():
222229
return types.QueueOutcomeRejectedOther, fmt.Errorf("%w: %w", types.ErrRejected, types.ErrFlowControllerNotRunning)

pkg/epp/flowcontrol/controller/internal/item.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@ import (
2020
"context"
2121
"errors"
2222
"fmt"
23+
"strconv"
2324
"sync"
2425
"sync/atomic"
2526
"time"
2627

2728
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
29+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
2830
)
2931

3032
// FinalState encapsulates the terminal outcome of a FlowItem's lifecycle.
@@ -154,6 +156,10 @@ func (fi *FlowItem) finalizeInternal(outcome types.QueueOutcome, err error) {
154156
// Atomically store the pointer. This is the critical memory barrier that publishes the state safely.
155157
fi.finalState.Store(finalState)
156158

159+
duration := time.Since(fi.enqueueTime)
160+
flowKey := fi.originalRequest.FlowKey()
161+
metrics.RecordFlowControlRequestQueueDuration(flowKey.ID, strconv.Itoa(flowKey.Priority), outcome.String(), duration)
162+
157163
fi.done <- finalState
158164
close(fi.done)
159165
}

pkg/epp/metrics/metrics.go

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,28 @@ var (
422422
},
423423
[]string{"commit", "build_ref"},
424424
)
425+
426+
// Flow Control Metrics
427+
flowControlRequestQueueDuration = prometheus.NewHistogramVec(
428+
prometheus.HistogramOpts{
429+
Subsystem: InferenceExtension,
430+
Name: "flow_control_request_queue_duration_seconds",
431+
Help: metricsutil.HelpMsgWithStability("Distribution of the total time requests spend in the EPP flow control layer, measured from the start of the EnqueueAndWait call until a final outcome is reached.", compbasemetrics.ALPHA),
432+
Buckets: []float64{
433+
0.0001, 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0,
434+
},
435+
},
436+
[]string{"fairness_id", "priority", "outcome"},
437+
)
438+
439+
flowControlQueueSize = prometheus.NewGaugeVec(
440+
prometheus.GaugeOpts{
441+
Subsystem: InferenceExtension,
442+
Name: "flow_control_queue_size",
443+
Help: metricsutil.HelpMsgWithStability("Current number of requests being actively managed by the EPP flow control layer, from the start of the EnqueueAndWait call until a final outcome is reached.", compbasemetrics.ALPHA),
444+
},
445+
[]string{"fairness_id", "priority"},
446+
)
425447
)
426448

427449
var registerMetrics sync.Once
@@ -473,7 +495,8 @@ func Register(customCollectors ...prometheus.Collector) {
473495
metrics.Registry.MustRegister(PrefixCacheSize)
474496
metrics.Registry.MustRegister(PrefixCacheHitRatio)
475497
metrics.Registry.MustRegister(PrefixCacheHitLength)
476-
498+
metrics.Registry.MustRegister(flowControlRequestQueueDuration)
499+
metrics.Registry.MustRegister(flowControlQueueSize)
477500
for _, collector := range customCollectors {
478501
metrics.Registry.MustRegister(collector)
479502
}
@@ -500,6 +523,8 @@ func Reset() {
500523
PrefixCacheSize.Reset()
501524
PrefixCacheHitRatio.Reset()
502525
PrefixCacheHitLength.Reset()
526+
flowControlRequestQueueDuration.Reset()
527+
flowControlQueueSize.Reset()
503528

504529
requestTPOT.Reset()
505530
requestTTFT.Reset()
@@ -770,6 +795,21 @@ func RecordInferenceExtensionInfo(commitSha, buildRef string) {
770795
InferenceExtensionInfo.WithLabelValues(commitSha, buildRef).Set(1)
771796
}
772797

798+
// RecordFlowControlRequestQueueDuration records the duration a request spent in the Flow Control layer.
799+
func RecordFlowControlRequestQueueDuration(fairnessID, priority, outcome string, duration time.Duration) {
800+
flowControlRequestQueueDuration.WithLabelValues(fairnessID, priority, outcome).Observe(duration.Seconds())
801+
}
802+
803+
// IncFlowControlQueueSize increments the Flow Control queue size gauge.
804+
func IncFlowControlQueueSize(fairnessID, priority string) {
805+
flowControlQueueSize.WithLabelValues(fairnessID, priority).Inc()
806+
}
807+
808+
// DecFlowControlQueueSize decrements the Flow Control queue size gauge.
809+
func DecFlowControlQueueSize(fairnessID, priority string) {
810+
flowControlQueueSize.WithLabelValues(fairnessID, priority).Dec()
811+
}
812+
773813
// SetTTFTSLOThreshold sets the TTFT SLO threshold for a model.
774814
// This allows dynamic threshold management and makes the threshold visible in metrics.
775815
func SetTTFTSLOThreshold(modelName, targetModelName string, threshold float64) {

0 commit comments

Comments
 (0)