Skip to content

Commit c94f1f6

Browse files
committed
feat: Add GetActivePods functionto plugins handle and datastore
Signed-off-by: Kfir Toledo <[email protected]>
1 parent 63e9468 commit c94f1f6

File tree

7 files changed

+165
-5
lines changed

7 files changed

+165
-5
lines changed

cmd/epp/runner/runner.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ func (r *Runner) Run(ctx context.Context) error {
230230
}
231231
}
232232

233-
err = r.parsePluginsConfiguration(ctx)
233+
err = r.parsePluginsConfiguration(ctx, datastore)
234234
if err != nil {
235235
setupLog.Error(err, "Failed to parse plugins configuration")
236236
return err
@@ -306,7 +306,7 @@ func (r *Runner) registerInTreePlugins() {
306306
plugins.Register(testfilter.HeaderBasedTestingFilterType, testfilter.HeaderBasedTestingFilterFactory)
307307
}
308308

309-
func (r *Runner) parsePluginsConfiguration(ctx context.Context) error {
309+
func (r *Runner) parsePluginsConfiguration(ctx context.Context, ds datastore.Datastore) error {
310310
if *configText == "" && *configFile == "" {
311311
return nil // configuring through code, not through file
312312
}
@@ -325,8 +325,9 @@ func (r *Runner) parsePluginsConfiguration(ctx context.Context) error {
325325
}
326326

327327
r.registerInTreePlugins()
328-
handle := plugins.NewEppHandle(ctx)
328+
handle := plugins.NewEppHandle(ctx, ds.GetActivePods)
329329
config, err := loader.LoadConfig(configBytes, handle, logger)
330+
330331
if err != nil {
331332
return fmt.Errorf("failed to load the configuration - %w", err)
332333
}

pkg/epp/datastore/datastore.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ type Datastore interface {
6363
PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics
6464
PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool
6565
PodDelete(namespacedName types.NamespacedName)
66+
GetActivePods() []types.NamespacedName
6667

6768
// Clears the store state, happens when the pool gets deleted.
6869
Clear()
@@ -232,6 +233,16 @@ func (ds *datastore) PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool {
232233
return ok
233234
}
234235

236+
// GetActivePods returns a list of all active pods.
237+
func (ds *datastore) GetActivePods() []types.NamespacedName {
238+
var namespacedNames []types.NamespacedName
239+
ds.pods.Range(func(k, _ any) bool {
240+
namespacedNames = append(namespacedNames, k.(types.NamespacedName))
241+
return true
242+
})
243+
return namespacedNames
244+
}
245+
235246
func (ds *datastore) PodDelete(namespacedName types.NamespacedName) {
236247
v, ok := ds.pods.LoadAndDelete(namespacedName)
237248
if ok {

pkg/epp/plugins/handle.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package plugins
1919
import (
2020
"context"
2121
"fmt"
22+
23+
"k8s.io/apimachinery/pkg/types"
2224
)
2325

2426
// Handle provides plugins a set of standard data and tools to work with
@@ -27,6 +29,9 @@ type Handle interface {
2729
Context() context.Context
2830

2931
HandlePlugins
32+
33+
// GetActivePods returns a list of all active pods
34+
GetActivePods() []types.NamespacedName
3035
}
3136

3237
// HandlePlugins defines a set of APIs to work with instantiated plugins
@@ -44,10 +49,14 @@ type HandlePlugins interface {
4449
GetAllPluginsWithNames() map[string]Plugin
4550
}
4651

52+
// GetActivePodsFunc is a function that returns a list of all active pods.
53+
type GetActivePodsFunc func() []types.NamespacedName
54+
4755
// eppHandle is an implementation of the interface plugins.Handle
4856
type eppHandle struct {
4957
ctx context.Context
5058
HandlePlugins
59+
getActivePods GetActivePodsFunc
5160
}
5261

5362
// Context returns a context the plugins can use, if they need one
@@ -84,7 +93,12 @@ func (h *eppHandlePlugins) GetAllPluginsWithNames() map[string]Plugin {
8493
return h.plugins
8594
}
8695

87-
func NewEppHandle(ctx context.Context) Handle {
96+
// GetActivePods returns a function that returns a list of all active pods
97+
func (h *eppHandle) GetActivePods() []types.NamespacedName {
98+
return h.getActivePods()
99+
}
100+
101+
func NewEppHandle(ctx context.Context, getActivePods GetActivePodsFunc) Handle {
88102
return &eppHandle{
89103
ctx: ctx,
90104
HandlePlugins: &eppHandlePlugins{

pkg/epp/scheduling/framework/plugins/multi/prefix/indexer.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,3 +149,23 @@ func (i *indexer) ReportLRUSize(interval time.Duration) {
149149
i.mu.RUnlock()
150150
}
151151
}
152+
153+
// RemovePod removes a pod and its associated entries from the indexer.
154+
func (i *indexer) RemovePod(pod ServerID) {
155+
i.mu.RLock()
156+
lruCache, exists := i.podToLRU[pod]
157+
i.mu.RUnlock()
158+
159+
if !exists {
160+
return
161+
}
162+
163+
// Remove all hashes associated with the pod from hashToPods (triggers eviction callbacks).
164+
for _, hash := range lruCache.Keys() {
165+
lruCache.Remove(hash)
166+
}
167+
168+
i.mu.Lock()
169+
delete(i.podToLRU, pod)
170+
i.mu.Unlock()
171+
}

pkg/epp/scheduling/framework/plugins/multi/prefix/indexer_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,63 @@ func TestIndexer_AddAndGet(t *testing.T) {
4545
servers = i.Get(BlockHash(4))
4646
assert.Empty(t, servers, "Cache should not contain non-existent hash")
4747
}
48+
49+
func TestIndexer_RemovePodAndEviction(t *testing.T) {
50+
const indexerSize = 10
51+
52+
i := newIndexer(indexerSize)
53+
54+
server1 := ServerID{Namespace: "default", Name: "server1"}
55+
server2 := ServerID{Namespace: "default", Name: "server2"}
56+
57+
// Add indexerSize hashes to both servers
58+
var hashes []BlockHash
59+
for j := 0; j < indexerSize; j++ {
60+
h := BlockHash(j)
61+
hashes = append(hashes, h)
62+
i.Add([]BlockHash{h}, server1)
63+
i.Add([]BlockHash{h}, server2)
64+
}
65+
66+
// Ensure all entries are added
67+
assert.Equal(t, indexerSize, i.podToLRU[server1].Len(), "server1 should have 10 entries")
68+
assert.Equal(t, indexerSize, i.podToLRU[server2].Len(), "server2 should have 10 entries")
69+
70+
// Ensure each hash in hashToPods maps to both server1 and server2
71+
for _, h := range hashes {
72+
pods := i.hashToPods[h]
73+
assert.Len(t, pods, 2, "Each hash should be associated with exactly 2 pods")
74+
assert.Contains(t, pods, server1, "hash should be associated with server1")
75+
assert.Contains(t, pods, server2, "hash should be associated with server2")
76+
}
77+
78+
// Add indexerSize hash to server1 → should evict BlockHash(0)
79+
evictedHash := BlockHash(0)
80+
newHash := BlockHash(indexerSize)
81+
i.Add([]BlockHash{newHash}, server1)
82+
83+
// server1 LRU should still be at max capacity
84+
assert.Equal(t, indexerSize, i.podToLRU[server1].Len(), "server1 LRU should maintain max size")
85+
86+
// BlockHash(0) should no longer have server1 in hashToPods
87+
pods := i.Get(evictedHash)
88+
assert.NotContains(t, pods, server1, "server1 should be evicted from hashToPods for hash 0")
89+
assert.Contains(t, pods, server2, "server2 should still have hash 0")
90+
91+
// Remove server2
92+
i.RemovePod(server2)
93+
94+
// hashToPods for hash 0 should now be empty
95+
pods = i.Get(evictedHash)
96+
assert.NotContains(t, pods, server2, "server2 should be removed from hash 0")
97+
assert.Empty(t, pods, "hash 0 should have no pods after both eviction and removal")
98+
99+
// All remaining hashes should map only to server1
100+
for hash, pods := range i.hashToPods {
101+
assert.Len(t, pods, 1, "hash %v should have only 1 pod after server2 removal", hash)
102+
assert.Contains(t, pods, server1, "hash %v should only contain server1", hash)
103+
}
104+
105+
// Ensure hashToPods contains exactly indexerSize hashes (post-eviction and server2 removal)
106+
assert.Len(t, i.hashToPods, indexerSize, "hashToPods should contain %d hashes after cleanup", indexerSize)
107+
}

pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"encoding/binary"
2222
"encoding/json"
2323
"fmt"
24+
"time"
2425

2526
"github.com/cespare/xxhash/v2"
2627
k8stypes "k8s.io/apimachinery/pkg/types"
@@ -56,6 +57,11 @@ const (
5657
PrefixCachePluginType = "prefix-cache-scorer"
5758
)
5859

60+
const (
61+
PodActiveCheckInterval = 1 * time.Minute
62+
PodInactivityTimeout = 5 * time.Minute
63+
)
64+
5965
var DefaultConfig = Config{
6066
HashBlockSize: DefaultHashBlockSize,
6167
MaxPrefixBlocksToMatch: DefaultMaxPrefixBlocks,
@@ -86,6 +92,7 @@ type podSet map[ServerID]struct{}
8692
type Indexer interface {
8793
Get(hash BlockHash) podSet
8894
Add(hashes []BlockHash, server ServerID)
95+
RemovePod(server ServerID)
8996
}
9097

9198
// BlockHash is a hash of the block of request body.
@@ -140,7 +147,9 @@ func PrefixCachePluginFactory(name string, rawParameters json.RawMessage, handle
140147
}
141148
}
142149

143-
return New(handle.Context(), parameters).WithName(name), nil
150+
p := New(handle.Context(), parameters).WithName(name)
151+
go p.StartPodActiveWatcher(handle.Context(), handle)
152+
return p, nil
144153
}
145154

146155
// New initializes a new prefix Plugin and returns its pointer.
@@ -245,6 +254,45 @@ func (p *Plugin) matchLongestPrefix(ctx context.Context, hashes []BlockHash) map
245254
return res
246255
}
247256

257+
// StartPodActiveWatcher starts a goroutine that watches for active pods.
258+
func (m *Plugin) StartPodActiveWatcher(ctx context.Context, handle plugins.Handle) {
259+
logger := log.FromContext(ctx).V(logutil.VERBOSE)
260+
261+
ticker := time.NewTicker(PodActiveCheckInterval)
262+
defer ticker.Stop()
263+
264+
podLastSeen := make(map[ServerID]time.Time)
265+
266+
for {
267+
select {
268+
case <-ctx.Done():
269+
return
270+
case <-ticker.C:
271+
now := time.Now()
272+
activePods := handle.GetActivePods()
273+
274+
// Track active pods
275+
activeSet := make(map[ServerID]struct{}, len(activePods))
276+
for _, np := range activePods {
277+
id := ServerID(np)
278+
activeSet[id] = struct{}{}
279+
podLastSeen[id] = now
280+
}
281+
282+
// Remove stale pods
283+
for pod, lastSeen := range podLastSeen {
284+
if _, stillActive := activeSet[pod]; !stillActive {
285+
if now.Sub(lastSeen) > PodInactivityTimeout {
286+
m.indexer.RemovePod(pod)
287+
delete(podLastSeen, pod)
288+
logger.Info("Removed inactive pod from prefix cache", "pod", pod)
289+
}
290+
}
291+
}
292+
}
293+
}
294+
}
295+
248296
// hashPrompt divides the prompt into blocks and calculate the prefix cache for each block.
249297
// hash(0) is the hash of the model name, since different models generally don't share prefix cache.
250298
// For block i, hash(i) = hash(block i content, hash(i-1)).

test/utils/handle.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package utils
1919
import (
2020
"context"
2121

22+
k8stypes "k8s.io/apimachinery/pkg/types"
23+
2224
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
2325
)
2426

@@ -33,6 +35,10 @@ func (h *testHandle) Context() context.Context {
3335
return h.ctx
3436
}
3537

38+
func (h *testHandle) GetActivePods() []k8stypes.NamespacedName {
39+
return []k8stypes.NamespacedName{}
40+
}
41+
3642
type testHandlePlugins struct {
3743
plugins map[string]plugins.Plugin
3844
}

0 commit comments

Comments
 (0)