@@ -32,6 +32,7 @@ import (
3232 "time"
3333
3434 "github.com/go-logr/logr"
35+ k8srand "k8s.io/apimachinery/pkg/util/rand"
3536 "k8s.io/utils/clock"
3637
3738 "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts"
@@ -57,10 +58,11 @@ type shardProcessor interface {
5758// shardProcessorFactory defines the signature for a function that creates a `shardProcessor`.
5859// This enables dependency injection for testing.
5960type shardProcessorFactory func (
61+ ctx context.Context ,
6062 shard contracts.RegistryShard ,
6163 saturationDetector contracts.SaturationDetector ,
62- clock clock.Clock ,
63- expiryCleanupInterval time.Duration ,
64+ clock clock.WithTicker ,
65+ cleanupSweepInterval time.Duration ,
6466 enqueueChannelBufferSize int ,
6567 logger logr.Logger ,
6668) shardProcessor
@@ -79,6 +81,14 @@ type managedWorker struct {
7981//
8082// The controller's `Run` loop executes periodically, acting as a garbage collector that keeps the pool of running
8183// workers synchronized with the dynamic shard topology of the `FlowRegistry`.
84+ //
85+ // Request Lifecycle Management:
86+ //
87+ // 1. Asynchronous Finalization (Controller-Owned): The Controller actively monitors the request Context
88+ // (TTL/Cancellation) in EnqueueAndWait. If the Context expires, the Controller immediately Finalizes the item and
89+ // unblocks the caller.
90+ // 2. Synchronous Finalization (Processor-Owned): The Processor handles Dispatch, Capacity Rejection, and Shutdown.
91+ // 3. Cleanup (Processor-Owned): The Processor periodically sweeps externally finalized items to reclaim capacity.
8292type FlowController struct {
8393 // --- Immutable dependencies (set at construction) ---
8494
@@ -129,18 +139,20 @@ func NewFlowController(
129139
130140 // Use the real shard processor implementation by default.
131141 fc .shardProcessorFactory = func (
142+ ctx context.Context ,
132143 shard contracts.RegistryShard ,
133144 saturationDetector contracts.SaturationDetector ,
134- clock clock.Clock ,
135- expiryCleanupInterval time.Duration ,
145+ clock clock.WithTicker ,
146+ cleanupSweepInterval time.Duration ,
136147 enqueueChannelBufferSize int ,
137148 logger logr.Logger ,
138149 ) shardProcessor {
139150 return internal .NewShardProcessor (
151+ ctx ,
140152 shard ,
141153 saturationDetector ,
142154 clock ,
143- expiryCleanupInterval ,
155+ cleanupSweepInterval ,
144156 enqueueChannelBufferSize ,
145157 logger )
146158 }
@@ -189,63 +201,162 @@ func (fc *FlowController) run(ctx context.Context) {
189201// stack and its `context.Context`. The system only needs to signal this specific goroutine to unblock it.
190202// - Direct Backpressure: If queues are full, `EnqueueAndWait` returns an error immediately, providing direct
191203// backpressure to the caller.
192- func (fc * FlowController ) EnqueueAndWait (req types.FlowControlRequest ) (types.QueueOutcome , error ) {
204+ func (fc * FlowController ) EnqueueAndWait (
205+ ctx context.Context ,
206+ req types.FlowControlRequest ,
207+ ) (types.QueueOutcome , error ) {
193208 if req == nil {
194209 return types .QueueOutcomeRejectedOther , errors .New ("request cannot be nil" )
195210 }
196- effectiveTTL := req .InitialEffectiveTTL ()
197- if effectiveTTL <= 0 {
198- effectiveTTL = fc .config .DefaultRequestTTL
199- }
200- enqueueTime := fc .clock .Now ()
201211
212+ // 1. Create the derived context that governs this request's lifecycle (Parent Cancellation + TTL).
213+ reqCtx , cancel , enqueueTime := fc .createRequestContext (ctx , req )
214+ defer cancel ()
215+
216+ // 2. Enter the distribution loop to find a home for the request.
217+ // This loop is responsible for retrying on ErrShardDraining.
202218 for {
203- select {
219+
220+ select { // Non-blocking check on controller lifecycle.
204221 case <- fc .parentCtx .Done ():
205222 return types .QueueOutcomeRejectedOther , fmt .Errorf ("%w: %w" , types .ErrRejected , types .ErrFlowControllerNotRunning )
206223 default :
207- // The controller is running, proceed.
208224 }
209225
210- // We must create a fresh `FlowItem` on each attempt since finalization is idempotent.
211- // However, we use the original, preserved `enqueueTime`.
212- item := internal .NewItem (req , effectiveTTL , enqueueTime )
213- if outcome , err := fc .distributeRequest (item ); err != nil {
214- return outcome , fmt .Errorf ("%w: %w" , types .ErrRejected , err )
226+ // Attempt to distribute the request once.
227+ item , err := fc .tryDistribution (reqCtx , req , enqueueTime )
228+ if err != nil {
229+ // Distribution failed terminally (e.g., no shards, context cancelled during blocking submit).
230+ // The item has already been finalized by tryDistribution.
231+ finalState := item .FinalState ()
232+ return finalState .Outcome , finalState .Err
215233 }
216234
217- // Block until the request is finalized (dispatched, rejected, or evicted).
218- // The finalization logic internally monitors for context cancellation and TTL expiry.
219- finalState := <- item .Done ()
220- if errors .Is (finalState .Err , contracts .ErrShardDraining ) {
221- fc .logger .V (logutil .DEBUG ).Info ("Shard is draining, retrying request" , "requestID" , req .ID ())
222- // Benign race with the chosen `contracts.RegistryShard` becoming Draining post selection but before the item was
223- // enqueued into its respective `contracts.ManagedQueue`. Simply try again.
235+ // Distribution was successful; ownership of the item has been transferred to a processor.
236+ // Now, we block here in awaitFinalization until the request is finalized by either the processor (e.g., dispatched,
237+ // rejected) or the controller itself (e.g., caller's context cancelled/TTL expired).
238+ outcome , err := fc .awaitFinalization (reqCtx , item )
239+ if errors .Is (err , contracts .ErrShardDraining ) {
240+ // This is a benign race condition where the chosen shard started draining after acceptance.
241+ fc .logger .V (logutil .DEBUG ).Info ("Selected shard is Draining, retrying request distribution" ,
242+ "flowKey" , req .FlowKey (), "requestID" , req .ID ())
243+ // Introduce a small, randomized delay (1-10ms) to prevent tight spinning loops and thundering herds during retry
244+ // scenarios (e.g., shard draining)
245+ // TODO: Replace this with a more sophisticated backoff strategy when our data parallelism story matures.
246+ // For now, this is more than sufficient.
247+ jitterMs := k8srand .Intn (10 ) + 1
248+ fc .clock .Sleep (time .Duration (jitterMs ) * time .Millisecond )
224249 continue
225250 }
226251
252+ // The outcome is terminal (Dispatched, Evicted, or a non-retriable rejection).
253+ return outcome , err
254+ }
255+ }
256+
257+ var errNoShards = errors .New ("no viable active shards available" )
258+
259+ // tryDistribution handles a single attempt to select a shard and submit a request.
260+ // If this function returns an error, it guarantees that the provided `item` has been finalized.
261+ func (fc * FlowController ) tryDistribution (
262+ reqCtx context.Context ,
263+ req types.FlowControlRequest ,
264+ enqueueTime time.Time ,
265+ ) (* internal.FlowItem , error ) {
266+ // Calculate effective TTL for item initialization (reqCtx is the enforcement mechanism).
267+ effectiveTTL := fc .config .DefaultRequestTTL
268+ if deadline , ok := reqCtx .Deadline (); ok {
269+ if ttl := deadline .Sub (enqueueTime ); ttl > 0 {
270+ effectiveTTL = ttl
271+ }
272+ }
273+
274+ // We must create a fresh FlowItem on each attempt as finalization is per-lifecycle.
275+ item := internal .NewItem (req , effectiveTTL , enqueueTime )
276+
277+ candidates , err := fc .selectDistributionCandidates (item .OriginalRequest ().FlowKey ())
278+ if err != nil {
279+ outcome := types .QueueOutcomeRejectedOther
280+ if errors .Is (err , errNoShards ) {
281+ outcome = types .QueueOutcomeRejectedCapacity
282+ }
283+ finalErr := fmt .Errorf ("%w: request not accepted: %w" , types .ErrRejected , err )
284+ item .FinalizeWithOutcome (outcome , finalErr )
285+ return item , finalErr
286+ }
287+
288+ outcome , err := fc .distributeRequest (reqCtx , item , candidates )
289+ if err == nil {
290+ // Success: Ownership of the item has been transferred to the processor.
291+ return item , nil
292+ }
293+
294+ // For any distribution error, the controller retains ownership and must finalize the item.
295+ var finalErr error
296+ if errors .Is (err , context .Canceled ) || errors .Is (err , context .DeadlineExceeded ) {
297+ // We propagate the original context error here, EnqueueAndWait will rely on item.FinalState().Err.
298+ finalErr = err
299+ item .Finalize (context .Cause (reqCtx ))
300+ } else { // e.g.,
301+ finalErr = fmt .Errorf ("%w: request not accepted: %w" , types .ErrRejected , err )
302+ item .FinalizeWithOutcome (outcome , finalErr )
303+ }
304+ return item , finalErr
305+ }
306+
307+ // awaitFinalization blocks until an item is finalized, either by the processor (synchronously) or by the controller
308+ // itself due to context expiry (asynchronously).
309+ func (fc * FlowController ) awaitFinalization (
310+ reqCtx context.Context ,
311+ item * internal.FlowItem ,
312+ ) (types.QueueOutcome , error ) {
313+ select {
314+ case <- reqCtx .Done ():
315+ // Asynchronous Finalization (Controller-initiated):
316+ // The request Context expired (Cancellation/TTL) while the item was being processed.
317+ cause := context .Cause (reqCtx )
318+ item .Finalize (cause )
319+
320+ // The processor will eventually discard this "zombie" item during its cleanup sweep.
321+ finalState := item .FinalState ()
322+ return finalState .Outcome , finalState .Err
323+
324+ case finalState := <- item .Done ():
325+ // Synchronous Finalization (Processor-initiated):
326+ // The processor finalized the item (Dispatch, Reject, Shutdown).
227327 return finalState .Outcome , finalState .Err
228328 }
229329}
230330
231- // distributeRequest implements a flow-aware, two-phase "Join-Shortest-Queue-by-Bytes" (JSQ-Bytes) distribution strategy
232- // with graceful backpressure. It selects the optimal worker for a given item and attempts to submit it.
233- //
234- // The algorithm operates as follows:
235- // 1. Candidate Selection: It identifies all Active shards for the item's flow and ranks them by the current byte size
236- // of that flow's queue, from least to most loaded.
237- // 2. Phase 1 (Non-blocking Fast Failover): It iterates through the ranked candidates and attempts a non-blocking
238- // submission. The first successful submission wins.
239- // 3. Phase 2 (Blocking Fallback): If all non-blocking attempts fail, it performs a single blocking submission to the
240- // least-loaded candidate, providing backpressure.
241- func (fc * FlowController ) distributeRequest (item * internal.FlowItem ) (types.QueueOutcome , error ) {
242- key := item .OriginalRequest ().FlowKey ()
243- reqID := item .OriginalRequest ().ID ()
244- type candidate struct {
245- processor shardProcessor
246- shardID string
247- byteSize uint64
331+ // createRequestContext derives the context that governs a request's lifecycle, enforcing the TTL deadline.
332+ func (fc * FlowController ) createRequestContext (
333+ ctx context.Context ,
334+ req types.FlowControlRequest ,
335+ ) (context.Context , context.CancelFunc , time.Time ) {
336+ enqueueTime := fc .clock .Now ()
337+ effectiveTTL := req .InitialEffectiveTTL ()
338+ if effectiveTTL <= 0 {
339+ effectiveTTL = fc .config .DefaultRequestTTL
248340 }
341+
342+ if effectiveTTL > 0 {
343+ reqCtx , cancel := context .WithDeadlineCause (ctx , enqueueTime .Add (effectiveTTL ), types .ErrTTLExpired )
344+ return reqCtx , cancel , enqueueTime
345+ }
346+ reqCtx , cancel := context .WithCancel (ctx )
347+ return reqCtx , cancel , enqueueTime
348+ }
349+
350+ // candidate holds the information needed to evaluate a shard as a potential target for a request.
351+ type candidate struct {
352+ processor shardProcessor
353+ shardID string
354+ byteSize uint64
355+ }
356+
357+ // selectDistributionCandidates identifies all Active shards for the item's flow and ranks them by the current byte size
358+ // of that flow's queue, from least to most loaded.
359+ func (fc * FlowController ) selectDistributionCandidates (key types.FlowKey ) ([]candidate , error ) {
249360 var candidates []candidate
250361 err := fc .registry .WithConnection (key , func (conn contracts.ActiveFlowConnection ) error {
251362 shards := conn .ActiveShards ()
@@ -262,41 +373,58 @@ func (fc *FlowController) distributeRequest(item *internal.FlowItem) (types.Queu
262373 return nil
263374 })
264375 if err != nil {
265- return types .QueueOutcomeRejectedOther , fmt .Errorf ("failed to acquire lease for request %q (flow %s): %w" ,
266- reqID , key , err )
376+ return nil , fmt .Errorf ("failed to acquire lease for flow %s: %w" , key , err )
267377 }
268378
269379 if len (candidates ) == 0 {
270- return types .QueueOutcomeRejectedCapacity , fmt .Errorf ("no viable Active shards available for request %q (flow %s)" ,
271- reqID , key )
380+ return nil , fmt .Errorf ("%w for flow %s" , errNoShards , key )
272381 }
273382
274383 slices .SortFunc (candidates , func (a , b candidate ) int {
275384 return cmp .Compare (a .byteSize , b .byteSize )
276385 })
277386
278- // --- Phase 1: Fast, non-blocking failover attempt ---
387+ return candidates , nil
388+ }
389+
390+ // distributeRequest implements a flow-aware, two-phase "Join-Shortest-Queue-by-Bytes" (JSQ-Bytes) distribution strategy
391+ // with graceful backpressure. It attempts to submit an item to the best-ranked candidate from the provided list.
392+ //
393+ // The algorithm operates as follows:
394+ // 1. Phase 1 (Non-blocking Fast Failover): It iterates through the ranked candidates and attempts a non-blocking
395+ // submission. The first successful submission wins.
396+ // 2. Phase 2 (Blocking Fallback): If all non-blocking attempts fail, it performs a single blocking submission to the
397+ // least-loaded candidate, providing backpressure.
398+ //
399+ // The provided context (ctx) is used for the blocking submission phase (SubmitOrBlock).
400+ //
401+ // Ownership Contract:
402+ // - Returns nil: Success. Ownership transferred to Processor.
403+ // - Returns error: Failure (Context expiry, shutdown,, etc.).
404+ // Ownership retained by Controller. The Controller MUST finalize the item.
405+ func (fc * FlowController ) distributeRequest (
406+ ctx context.Context ,
407+ item * internal.FlowItem ,
408+ candidates []candidate ,
409+ ) (types.QueueOutcome , error ) {
410+ reqID := item .OriginalRequest ().ID ()
279411 for _ , c := range candidates {
280412 if err := c .processor .Submit (item ); err == nil {
281- return types .QueueOutcomeNotYetFinalized , nil // Success
413+ return types .QueueOutcomeNotYetFinalized , nil
282414 }
283- fc .logger .V (logutil .DEBUG ).Info ("Processor busy during fast failover, trying next candidate" ,
415+ fc .logger .V (logutil .TRACE ).Info ("Processor busy during fast failover, trying next candidate" ,
284416 "shardID" , c .shardID , "requestID" , reqID )
285417 }
286418
287- // --- Phase 2: All processors busy. Attempt a single blocking send to the best candidate. ---
419+ // All processors are busy. Attempt a single blocking submission to the least-loaded candidate.
288420 bestCandidate := candidates [0 ]
289- fc .logger .V (logutil .DEBUG ).Info ("All processors busy, attempting blocking submit to best candidate" ,
290- "shardID" , bestCandidate .shardID , "requestID" , reqID , "queueByteSize" , bestCandidate .byteSize )
291-
292- err = bestCandidate .processor .SubmitOrBlock (item .OriginalRequest ().Context (), item )
421+ fc .logger .V (logutil .TRACE ).Info ("All processors busy, attempting blocking submit to best candidate" ,
422+ "shardID" , bestCandidate .shardID , "requestID" , reqID )
423+ err := bestCandidate .processor .SubmitOrBlock (ctx , item )
293424 if err != nil {
294- // If even the blocking attempt fails (e.g., context cancelled or processor shut down), the request is definitively
295- // rejected.
296- return types .QueueOutcomeRejectedCapacity , fmt .Errorf (
297- "all viable shard processors are at capacity for request %q (flow %s): %w" , reqID , key , err )
425+ return types .QueueOutcomeRejectedOther , fmt .Errorf ("%w: request not accepted: %w" , types .ErrRejected , err )
298426 }
299- return types .QueueOutcomeNotYetFinalized , nil
427+ return types .QueueOutcomeNotYetFinalized , nil // Success, ownership transferred.
300428}
301429
302430// getOrStartWorker implements the lazy-loading and startup of shard processors.
@@ -311,6 +439,7 @@ func (fc *FlowController) getOrStartWorker(shard contracts.RegistryShard) *manag
311439 // Construct a new worker, but do not start its processor goroutine yet.
312440 processorCtx , cancel := context .WithCancel (fc .parentCtx )
313441 processor := fc .shardProcessorFactory (
442+ processorCtx ,
314443 shard ,
315444 fc .saturationDetector ,
316445 fc .clock ,
0 commit comments