Skip to content

Commit 62e4bd9

Browse files
committed
feat: Remove deleted pod from prefix-cache scorer
Signed-off-by: Kfir Toledo <[email protected]>
1 parent a634467 commit 62e4bd9

File tree

6 files changed

+155
-5
lines changed

6 files changed

+155
-5
lines changed

cmd/epp/runner/runner.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ func (r *Runner) Run(ctx context.Context) error {
233233
runtime.SetBlockProfileRate(1)
234234
}
235235

236-
err = r.parsePluginsConfiguration(ctx)
236+
err = r.parsePluginsConfiguration(ctx, datastore)
237237
if err != nil {
238238
setupLog.Error(err, "Failed to parse plugins configuration")
239239
return err
@@ -310,7 +310,7 @@ func (r *Runner) registerInTreePlugins() {
310310
plugins.Register(testfilter.HeaderBasedTestingFilterType, testfilter.HeaderBasedTestingFilterFactory)
311311
}
312312

313-
func (r *Runner) parsePluginsConfiguration(ctx context.Context) error {
313+
func (r *Runner) parsePluginsConfiguration(ctx context.Context, ds datastore.Datastore) error {
314314
if *configText == "" && *configFile == "" {
315315
return nil // configuring through code, not through file
316316
}
@@ -329,8 +329,9 @@ func (r *Runner) parsePluginsConfiguration(ctx context.Context) error {
329329
}
330330

331331
r.registerInTreePlugins()
332-
handle := plugins.NewEppHandle(ctx)
332+
handle := plugins.NewEppHandle(ctx, ds.PodList)
333333
config, err := loader.LoadConfig(configBytes, handle, logger)
334+
334335
if err != nil {
335336
return fmt.Errorf("failed to load the configuration - %w", err)
336337
}

pkg/epp/plugins/handle.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package plugins
1919
import (
2020
"context"
2121
"fmt"
22+
23+
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
2224
)
2325

2426
// Handle provides plugins a set of standard data and tools to work with
@@ -27,6 +29,9 @@ type Handle interface {
2729
Context() context.Context
2830

2931
HandlePlugins
32+
33+
// PodList lists pods matching the given predicate.
34+
PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics
3035
}
3136

3237
// HandlePlugins defines a set of APIs to work with instantiated plugins
@@ -44,10 +49,14 @@ type HandlePlugins interface {
4449
GetAllPluginsWithNames() map[string]Plugin
4550
}
4651

52+
// PodListFunc is a function type that filters and returns a list of pod metrics
53+
type PodListFunc func(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics
54+
4755
// eppHandle is an implementation of the interface plugins.Handle
4856
type eppHandle struct {
4957
ctx context.Context
5058
HandlePlugins
59+
podList PodListFunc
5160
}
5261

5362
// Context returns a context the plugins can use, if they need one
@@ -84,12 +93,18 @@ func (h *eppHandlePlugins) GetAllPluginsWithNames() map[string]Plugin {
8493
return h.plugins
8594
}
8695

87-
func NewEppHandle(ctx context.Context) Handle {
96+
// PodList lists pods matching the given predicate.
97+
func (h *eppHandle) PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics {
98+
return h.podList(predicate)
99+
}
100+
101+
func NewEppHandle(ctx context.Context, podList PodListFunc) Handle {
88102
return &eppHandle{
89103
ctx: ctx,
90104
HandlePlugins: &eppHandlePlugins{
91105
plugins: map[string]Plugin{},
92106
},
107+
podList: podList,
93108
}
94109
}
95110

pkg/epp/scheduling/framework/plugins/multi/prefix/indexer.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,3 +149,35 @@ func (i *indexer) reportLRUSize(ctx context.Context, interval time.Duration) {
149149
i.mu.RUnlock()
150150
}
151151
}
152+
153+
// RemovePod removes a pod and its associated entries from the indexer.
154+
func (i *indexer) RemovePod(pod ServerID) {
155+
i.mu.RLock()
156+
lruCache, exists := i.podToLRU[pod]
157+
i.mu.RUnlock()
158+
159+
if !exists {
160+
return
161+
}
162+
163+
// Remove all hashes associated with the pod from hashToPods (triggers eviction callbacks).
164+
for _, hash := range lruCache.Keys() {
165+
lruCache.Remove(hash)
166+
}
167+
168+
i.mu.Lock()
169+
delete(i.podToLRU, pod)
170+
i.mu.Unlock()
171+
}
172+
173+
// Pods returns the list of all pods currently tracked in the indexer.
174+
func (i *indexer) Pods() []ServerID {
175+
i.mu.RLock()
176+
defer i.mu.RUnlock()
177+
178+
pods := make([]ServerID, 0, len(i.podToLRU))
179+
for pod := range i.podToLRU {
180+
pods = append(pods, pod)
181+
}
182+
return pods
183+
}

pkg/epp/scheduling/framework/plugins/multi/prefix/indexer_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,3 +46,63 @@ func TestIndexer_AddAndGet(t *testing.T) {
4646
servers = i.Get(BlockHash(4))
4747
assert.Empty(t, servers, "Cache should not contain non-existent hash")
4848
}
49+
50+
func TestIndexer_RemovePodAndEviction(t *testing.T) {
51+
const indexerSize = 10
52+
53+
i := newIndexer(context.Background(), indexerSize)
54+
55+
server1 := ServerID{Namespace: "default", Name: "server1"}
56+
server2 := ServerID{Namespace: "default", Name: "server2"}
57+
58+
// Add indexerSize hashes to both servers
59+
var hashes []BlockHash
60+
for j := 0; j < indexerSize; j++ {
61+
h := BlockHash(j)
62+
hashes = append(hashes, h)
63+
i.Add([]BlockHash{h}, server1)
64+
i.Add([]BlockHash{h}, server2)
65+
}
66+
67+
// Ensure all entries are added
68+
assert.Equal(t, indexerSize, i.podToLRU[server1].Len(), "server1 should have 10 entries")
69+
assert.Equal(t, indexerSize, i.podToLRU[server2].Len(), "server2 should have 10 entries")
70+
71+
// Ensure each hash in hashToPods maps to both server1 and server2
72+
for _, h := range hashes {
73+
pods := i.hashToPods[h]
74+
assert.Len(t, pods, 2, "Each hash should be associated with exactly 2 pods")
75+
assert.Contains(t, pods, server1, "hash should be associated with server1")
76+
assert.Contains(t, pods, server2, "hash should be associated with server2")
77+
}
78+
79+
// Add indexerSize hash to server1 → should evict BlockHash(0)
80+
evictedHash := BlockHash(0)
81+
newHash := BlockHash(indexerSize)
82+
i.Add([]BlockHash{newHash}, server1)
83+
84+
// server1 LRU should still be at max capacity
85+
assert.Equal(t, indexerSize, i.podToLRU[server1].Len(), "server1 LRU should maintain max size")
86+
87+
// BlockHash(0) should no longer have server1 in hashToPods
88+
pods := i.Get(evictedHash)
89+
assert.NotContains(t, pods, server1, "server1 should be evicted from hashToPods for hash 0")
90+
assert.Contains(t, pods, server2, "server2 should still have hash 0")
91+
92+
// Remove server2
93+
i.RemovePod(server2)
94+
95+
// hashToPods for hash 0 should now be empty
96+
pods = i.Get(evictedHash)
97+
assert.NotContains(t, pods, server2, "server2 should be removed from hash 0")
98+
assert.Empty(t, pods, "hash 0 should have no pods after both eviction and removal")
99+
100+
// All remaining hashes should map only to server1
101+
for hash, pods := range i.hashToPods {
102+
assert.Len(t, pods, 1, "hash %v should have only 1 pod after server2 removal", hash)
103+
assert.Contains(t, pods, server1, "hash %v should only contain server1", hash)
104+
}
105+
106+
// Ensure hashToPods contains exactly indexerSize hashes (post-eviction and server2 removal)
107+
assert.Len(t, i.hashToPods, indexerSize, "hashToPods should contain %d hashes after cleanup", indexerSize)
108+
}

pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,13 @@ import (
2121
"encoding/binary"
2222
"encoding/json"
2323
"fmt"
24+
"time"
2425

2526
"github.com/cespare/xxhash/v2"
2627
k8stypes "k8s.io/apimachinery/pkg/types"
2728
"sigs.k8s.io/controller-runtime/pkg/log"
2829

30+
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
2931
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
3032
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
3133
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol"
@@ -56,6 +58,10 @@ const (
5658
PrefixCachePluginType = "prefix-cache-scorer"
5759
)
5860

61+
const (
62+
PodActiveCheckInterval = 2 * time.Minute
63+
)
64+
5965
var DefaultConfig = Config{
6066
HashBlockSize: DefaultHashBlockSize,
6167
MaxPrefixBlocksToMatch: DefaultMaxPrefixBlocks,
@@ -86,6 +92,8 @@ type podSet map[ServerID]struct{}
8692
type Indexer interface {
8793
Get(hash BlockHash) podSet
8894
Add(hashes []BlockHash, server ServerID)
95+
RemovePod(server ServerID)
96+
Pods() []ServerID
8997
}
9098

9199
// BlockHash is a hash of the block of request body.
@@ -140,7 +148,9 @@ func PrefixCachePluginFactory(name string, rawParameters json.RawMessage, handle
140148
}
141149
}
142150

143-
return New(handle.Context(), parameters).WithName(name), nil
151+
p := New(handle.Context(), parameters).WithName(name)
152+
go p.CleanUpInactivePods(handle.Context(), handle)
153+
return p, nil
144154
}
145155

146156
// New initializes a new prefix Plugin and returns its pointer.
@@ -246,6 +256,33 @@ func (p *Plugin) matchLongestPrefix(ctx context.Context, hashes []BlockHash) map
246256
return res
247257
}
248258

259+
// CleanUpInactivePods starts a goroutine that watches for inactive pods.
260+
func (m *Plugin) CleanUpInactivePods(ctx context.Context, handle plugins.Handle) {
261+
logger := log.FromContext(ctx).V(logutil.VERBOSE)
262+
ticker := time.NewTicker(PodActiveCheckInterval)
263+
defer ticker.Stop()
264+
265+
for {
266+
select {
267+
case <-ctx.Done():
268+
return
269+
case <-ticker.C:
270+
activePodMetrics := handle.PodList(func(_ backendmetrics.PodMetrics) bool { return true })
271+
activePods := make(map[ServerID]struct{}, len(activePodMetrics))
272+
for _, pm := range activePodMetrics {
273+
activePods[ServerID(pm.GetPod().NamespacedName)] = struct{}{}
274+
}
275+
276+
for _, pod := range m.indexer.Pods() {
277+
if _, ok := activePods[pod]; !ok {
278+
m.indexer.RemovePod(pod)
279+
logger.Info("Removed pod not in active set", "pod", pod)
280+
}
281+
}
282+
}
283+
}
284+
}
285+
249286
// hashPrompt divides the prompt into blocks and calculate the prefix cache for each block.
250287
// hash(0) is the hash of the model name, since different models generally don't share prefix cache.
251288
// For block i, hash(i) = hash(block i content, hash(i-1)).

test/utils/handle.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package utils
1919
import (
2020
"context"
2121

22+
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
2223
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
2324
)
2425

@@ -33,6 +34,10 @@ func (h *testHandle) Context() context.Context {
3334
return h.ctx
3435
}
3536

37+
func (h *testHandle) PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics {
38+
return []backendmetrics.PodMetrics{}
39+
}
40+
3641
type testHandlePlugins struct {
3742
plugins map[string]plugins.Plugin
3843
}

0 commit comments

Comments
 (0)