Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module github.com/volcano-sh/kthena
go 1.24.0

require (
github.com/agiledragon/gomonkey/v2 v2.13.0
github.com/alicebob/miniredis/v2 v2.35.0
github.com/cespare/xxhash v1.1.0
github.com/gammazero/deque v1.0.0
Expand Down
8 changes: 0 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ github.com/Masterminds/sprig/v3 v3.3.0 h1:mQh0Yrg1XPo6vjYXgtf5OtijNAKJRNcTdOOGZe
github.com/Masterminds/sprig/v3 v3.3.0/go.mod h1:Zy1iXRYNqNLUolqCpL4uhk6SHUMAOSCzdgBfDb35Lz0=
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/agiledragon/gomonkey/v2 v2.13.0 h1:B24Jg6wBI1iB8EFR1c+/aoTg7QN/Cum7YffG8KMIyYo=
github.com/agiledragon/gomonkey/v2 v2.13.0/go.mod h1:ap1AmDzcVOAz1YpeJ3TCzIgstoaWLA6jbbgxfB4w2iY=
github.com/alicebob/miniredis/v2 v2.35.0 h1:QwLphYqCEAo1eu1TqPRN2jgVMPBweeQcR21jeqDCONI=
github.com/alicebob/miniredis/v2 v2.35.0/go.mod h1:TcL7YfarKPGDAthEtl5NBeHZfeUQj6OXMm/+iu5cLMM=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
Expand Down Expand Up @@ -110,7 +108,6 @@ github.com/google/pprof v0.0.0-20250923004556-9e5a51aed1e8 h1:ZI8gCoCjGzPsum4L21
github.com/google/pprof v0.0.0-20250923004556-9e5a51aed1e8/go.mod h1:I6V7YzU0XDpsHqbsyrghnFZLO1gwK6NPTNvmetQIk9U=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 h1:JeSE6pjso5THxAzdVpqr6/geYxZytqFMBCOtn/ujyeo=
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674/go.mod h1:r4w70xmWCQKmi1ONH4KIaBptdivuRPyosB9RmPlGEwA=
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
Expand All @@ -129,7 +126,6 @@ github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8Hm
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
Expand Down Expand Up @@ -219,8 +215,6 @@ github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=
github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs=
github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k=
github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/cast v1.8.0 h1:gEN9K4b8Xws4EX0+a0reLmhq8moKn7ntRlQYgjPeCDk=
Expand Down Expand Up @@ -279,7 +273,6 @@ golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.29.0 h1:HV8lRxZC4l2cr3Zq1LvtOsi/ThTgWnUk/y64QSs8GwA=
golang.org/x/mod v0.29.0/go.mod h1:NyhrlYXJ2H4eJiRy/WDBO6HMqZQ6q9nk4JzS3NuCK+w=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
Expand Down Expand Up @@ -309,7 +302,6 @@ golang.org/x/text v0.30.0/go.mod h1:yDdHFIX9t+tORqspjENWgzaCVXgk0yYnYuSZ8UzzBVM=
golang.org/x/time v0.13.0 h1:eUlYslOIt32DgYD6utsuUeHs4d7AsEYLuIAdg7FlYgI=
golang.org/x/time v0.13.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
Expand Down
21 changes: 0 additions & 21 deletions licenses/github.com/agiledragon/gomonkey/v2/LICENSE

This file was deleted.

56 changes: 25 additions & 31 deletions pkg/kthena-router/controller/modelserver_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"testing"
"time"

"github.com/agiledragon/gomonkey/v2"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
Expand All @@ -35,11 +34,28 @@ import (
kthenafake "github.com/volcano-sh/kthena/client-go/clientset/versioned/fake"
informersv1alpha1 "github.com/volcano-sh/kthena/client-go/informers/externalversions"
aiv1alpha1 "github.com/volcano-sh/kthena/pkg/apis/networking/v1alpha1"
"github.com/volcano-sh/kthena/pkg/kthena-router/backend"
"github.com/volcano-sh/kthena/pkg/kthena-router/datastore"
"github.com/volcano-sh/kthena/pkg/kthena-router/utils"
)

type fakePodRuntimeInspector struct{}

func (fakePodRuntimeInspector) GetPodMetrics(_ string, _ *corev1.Pod, _ map[string]*dto.Histogram) (map[string]float64, map[string]*dto.Histogram) {
return map[string]float64{
utils.GPUCacheUsage: 0.5,
utils.RequestWaitingNum: 10,
utils.RequestRunningNum: 5,
}, nil
}

func (fakePodRuntimeInspector) GetPodModels(_ string, _ *corev1.Pod) ([]string, error) {
return []string{"test-model"}, nil
}

func newStoreWithMockBackend() datastore.Store {
return datastore.New(datastore.WithPodRuntimeInspector(fakePodRuntimeInspector{}))
}

func TestModelServerController_ModelServerLifecycle(t *testing.T) {
// Create fake clients
kubeClient := kubefake.NewSimpleClientset()
Expand All @@ -50,7 +66,7 @@ func TestModelServerController_ModelServerLifecycle(t *testing.T) {
kthenaInformerFactory := informersv1alpha1.NewSharedInformerFactory(kthenaClient, 0)

// Create store
store := datastore.New()
store := newStoreWithMockBackend()

// Create controller
controller := NewModelServerController(
Expand Down Expand Up @@ -259,9 +275,6 @@ func TestModelServerController_ModelServerLifecycle(t *testing.T) {
}

func TestModelServerController_PodLifecycle(t *testing.T) {
patch := setupMockBackend()
defer patch.Reset()

// Create fake clients
kubeClient := kubefake.NewSimpleClientset()
kthenaClient := kthenafake.NewSimpleClientset()
Expand Down Expand Up @@ -290,7 +303,7 @@ func TestModelServerController_PodLifecycle(t *testing.T) {
kthenaInformerFactory := informersv1alpha1.NewSharedInformerFactory(kthenaClient, 0)

// Create store
store := datastore.New()
store := newStoreWithMockBackend()

// Create controller
controller := NewModelServerController(
Expand Down Expand Up @@ -506,7 +519,7 @@ func TestModelServerController_ErrorHandling(t *testing.T) {
kthenaInformerFactory := informersv1alpha1.NewSharedInformerFactory(kthenaClient, 0)

// Create store
store := datastore.New()
store := newStoreWithMockBackend()

// Create controller
controller := NewModelServerController(
Expand Down Expand Up @@ -550,7 +563,7 @@ func TestModelServerController_WorkQueueProcessing(t *testing.T) {
kthenaInformerFactory := informersv1alpha1.NewSharedInformerFactory(kthenaClient, 0)

// Create store
store := datastore.New()
store := newStoreWithMockBackend()

// Create controller
controller := NewModelServerController(
Expand Down Expand Up @@ -625,7 +638,7 @@ func TestModelServerController_PodSelectionLogic(t *testing.T) {
kthenaInformerFactory := informersv1alpha1.NewSharedInformerFactory(kthenaClient, 0)

// Create store
store := datastore.New()
store := newStoreWithMockBackend()

// Create controller
controller := NewModelServerController(
Expand Down Expand Up @@ -815,7 +828,7 @@ func TestModelServerController_ComprehensiveLifecycleTest(t *testing.T) {
defer close(stopCh)

// Create controller and store
store := datastore.New()
store := newStoreWithMockBackend()
controller := NewModelServerController(
kthenaInformerFactory,
kubeInformerFactory,
Expand Down Expand Up @@ -899,9 +912,6 @@ func TestModelServerController_ComprehensiveLifecycleTest(t *testing.T) {
// 3. Then we sync the second modelserver (ms2)
// 4. Verify that GetPodsByModelServer(ms2) returns all pods correctly
func TestModelServerController_SharedPods(t *testing.T) {
patch := setupMockBackend()
defer patch.Reset()

// Create fake clients
kubeClient := kubefake.NewSimpleClientset()
kthenaClient := kthenafake.NewSimpleClientset()
Expand Down Expand Up @@ -992,7 +1002,7 @@ func TestModelServerController_SharedPods(t *testing.T) {
kthenaInformerFactory := informersv1alpha1.NewSharedInformerFactory(kthenaClient, 0)

// Create store
store := datastore.New()
store := newStoreWithMockBackend()

// Create controller
controller := NewModelServerController(
Expand Down Expand Up @@ -1121,19 +1131,3 @@ func waitForObjectInCache(t *testing.T, timeout time.Duration, checkFunc func()
}
}
}

// Helper function to setup mock for backend calls
func setupMockBackend() *gomonkey.Patches {
patch := gomonkey.NewPatches()
patch.ApplyFunc(backend.GetPodMetrics, func(backend string, pod *corev1.Pod, previousHistogram map[string]*dto.Histogram) (map[string]float64, map[string]*dto.Histogram) {
return map[string]float64{
utils.GPUCacheUsage: 0.5,
utils.RequestWaitingNum: 10,
utils.RequestRunningNum: 5,
}, map[string]*dto.Histogram{}
})
patch.ApplyFunc(backend.GetPodModels, func(backend string, pod *corev1.Pod) ([]string, error) {
return []string{"test-model"}, nil
})
return patch
}
79 changes: 23 additions & 56 deletions pkg/kthena-router/datastore/ordering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"sync"
"testing"

"github.com/agiledragon/gomonkey/v2"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"istio.io/istio/pkg/util/sets"
Expand All @@ -29,7 +28,6 @@ import (
"k8s.io/apimachinery/pkg/types"

aiv1alpha1 "github.com/volcano-sh/kthena/pkg/apis/networking/v1alpha1"
"github.com/volcano-sh/kthena/pkg/kthena-router/backend"
"github.com/volcano-sh/kthena/pkg/kthena-router/utils"
)

Expand Down Expand Up @@ -59,28 +57,24 @@ func createTestModelServer(namespace, name string, engine aiv1alpha1.InferenceEn
}
}

// Helper function to setup mock for backend calls
func setupMockBackend() *gomonkey.Patches {
patch := gomonkey.NewPatches()
patch.ApplyFunc(backend.GetPodMetrics, func(backend string, pod *corev1.Pod, previousHistogram map[string]*dto.Histogram) (map[string]float64, map[string]*dto.Histogram) {
return map[string]float64{
utils.GPUCacheUsage: 0.5,
utils.RequestWaitingNum: 10,
utils.RequestRunningNum: 5,
}, map[string]*dto.Histogram{}
})
patch.ApplyFunc(backend.GetPodModels, func(backend string, pod *corev1.Pod) ([]string, error) {
return []string{"test-model"}, nil
})
return patch
func newStoreWithMockBackend() *store {
return New(WithPodRuntimeInspector(&fakePodRuntimeInspector{
metricsFn: func(_ string, _ *corev1.Pod, _ map[string]*dto.Histogram) (map[string]float64, map[string]*dto.Histogram) {
return map[string]float64{
utils.GPUCacheUsage: 0.5,
utils.RequestWaitingNum: 10,
utils.RequestRunningNum: 5,
}, nil
},
modelsFn: func(_ string, _ *corev1.Pod) ([]string, error) {
return []string{"test-model"}, nil
},
})).(*store)
}

// Test Case 1: ModelServer added first, then Pod
func TestStore_AddModelServerFirst_ThenPod(t *testing.T) {
patch := setupMockBackend()
defer patch.Reset()

s := New().(*store)
s := newStoreWithMockBackend()

// Step 1: Add ModelServer first
ms := createTestModelServer("default", "model1", aiv1alpha1.VLLM)
Expand Down Expand Up @@ -117,10 +111,7 @@ func TestStore_AddModelServerFirst_ThenPod(t *testing.T) {
// Test Case 2: Pod added first, then ModelServer
// Note: Current implementation expects ModelServer to exist before Pod
func TestStore_AddPodFirst_ThenModelServer(t *testing.T) {
patch := setupMockBackend()
defer patch.Reset()

s := New().(*store)
s := newStoreWithMockBackend()

ms := createTestModelServer("default", "model1", aiv1alpha1.VLLM)
pod := createTestPod("default", "pod1")
Expand Down Expand Up @@ -155,10 +146,7 @@ func TestStore_AddPodFirst_ThenModelServer(t *testing.T) {

// Test Case 3: Multiple Pods added with ModelServer
func TestStore_MultiplePods_ThenModelServer(t *testing.T) {
patch := setupMockBackend()
defer patch.Reset()

s := New().(*store)
s := newStoreWithMockBackend()

ms := createTestModelServer("default", "model1", aiv1alpha1.VLLM)
pod1 := createTestPod("default", "pod1")
Expand Down Expand Up @@ -198,10 +186,7 @@ func TestStore_MultiplePods_ThenModelServer(t *testing.T) {

// Test Case 4: ModelServer with multiple Pods added together
func TestStore_ModelServerWithMultiplePods_AddedTogether(t *testing.T) {
patch := setupMockBackend()
defer patch.Reset()

s := New().(*store)
s := newStoreWithMockBackend()

ms := createTestModelServer("default", "model1", aiv1alpha1.VLLM)
pod1 := createTestPod("default", "pod1")
Expand Down Expand Up @@ -233,10 +218,7 @@ func TestStore_ModelServerWithMultiplePods_AddedTogether(t *testing.T) {

// Test Case 5: Pod belongs to multiple ModelServers
func TestStore_PodBelongsToMultipleModelServers(t *testing.T) {
patch := setupMockBackend()
defer patch.Reset()

s := New().(*store)
s := newStoreWithMockBackend()

ms1 := createTestModelServer("default", "model1", aiv1alpha1.VLLM)
ms2 := createTestModelServer("default", "model2", aiv1alpha1.VLLM)
Expand Down Expand Up @@ -272,10 +254,7 @@ func TestStore_PodBelongsToMultipleModelServers(t *testing.T) {

// Test Case 6: Pod with multiple ModelServers
func TestStore_PodWithMultipleModelServers_ThenAddModelServers(t *testing.T) {
patch := setupMockBackend()
defer patch.Reset()

s := New().(*store)
s := newStoreWithMockBackend()

ms1 := createTestModelServer("default", "model1", aiv1alpha1.VLLM)
ms2 := createTestModelServer("default", "model2", aiv1alpha1.VLLM)
Expand Down Expand Up @@ -318,10 +297,7 @@ func TestStore_PodWithMultipleModelServers_ThenAddModelServers(t *testing.T) {

// Test Case 7: Update operations - changing Pod's ModelServer associations
func TestStore_UpdatePodModelServerAssociations(t *testing.T) {
patch := setupMockBackend()
defer patch.Reset()

s := New().(*store)
s := newStoreWithMockBackend()

ms1 := createTestModelServer("default", "model1", aiv1alpha1.VLLM)
ms2 := createTestModelServer("default", "model2", aiv1alpha1.VLLM)
Expand Down Expand Up @@ -364,10 +340,7 @@ func TestStore_UpdatePodModelServerAssociations(t *testing.T) {

// Test Case 8: Interleaved operations
func TestStore_InterleavedOperations(t *testing.T) {
patch := setupMockBackend()
defer patch.Reset()

s := New().(*store)
s := newStoreWithMockBackend()

ms1 := createTestModelServer("default", "model1", aiv1alpha1.VLLM)
ms2 := createTestModelServer("default", "model2", aiv1alpha1.VLLM)
Expand Down Expand Up @@ -422,10 +395,7 @@ func TestStore_InterleavedOperations(t *testing.T) {

// Test Case 9: Deletion scenarios
func TestStore_DeletionScenarios(t *testing.T) {
patch := setupMockBackend()
defer patch.Reset()

s := New().(*store)
s := newStoreWithMockBackend()

ms1 := createTestModelServer("default", "model1", aiv1alpha1.VLLM)
ms2 := createTestModelServer("default", "model2", aiv1alpha1.VLLM)
Expand Down Expand Up @@ -515,10 +485,7 @@ func TestStore_EdgeCases(t *testing.T) {

// Test Case 11: random operations (simulated)
func TestStore_RandomOperations(t *testing.T) {
patch := setupMockBackend()
defer patch.Reset()

s := New().(*store)
s := newStoreWithMockBackend()

// Simulate rapid add/update operations that might happen concurrently
ms := createTestModelServer("default", "model1", aiv1alpha1.VLLM)
Expand Down
Loading
Loading