From c37c6967bfb3673086676a6597d36a49eaf1d58d Mon Sep 17 00:00:00 2001 From: katara-Jayprakash Date: Thu, 26 Mar 2026 14:40:56 +0530 Subject: [PATCH 1/6] make ControllerRevision immutable for existing revision keys Signed-off-by: katara-Jayprakash --- .../controller/model_serving_controller.go | 13 +++- .../model_serving_controller_test.go | 60 +++++++++++++++++++ .../utils/controller_revision.go | 14 +---- 3 files changed, 74 insertions(+), 13 deletions(-) diff --git a/pkg/model-serving-controller/controller/model_serving_controller.go b/pkg/model-serving-controller/controller/model_serving_controller.go index 047a7304e..61ef7729c 100644 --- a/pkg/model-serving-controller/controller/model_serving_controller.go +++ b/pkg/model-serving-controller/controller/model_serving_controller.go @@ -789,6 +789,10 @@ func (c *ModelServingController) manageRole(ctx context.Context, ms *workloadv1a // Deleting ServingGroup will be recreated after the deletion is complete, so there is no need to scale the roles continue } + effectiveRevision := newRevision + if servingGroup.Revision != "" { + effectiveRevision = servingGroup.Revision + } _, servingGroupOrdinal := utils.GetParentNameAndOrdinal(servingGroup.Name) isPartitionProtected := partition > 0 && index < partition @@ -917,6 +921,11 @@ func (c *ModelServingController) scaleUpRoles(ctx context.Context, ms *workloadv // It handles both scale up and scale down operations for the role func (c *ModelServingController) manageRoleReplicas(ctx context.Context, ms *workloadv1alpha1.ModelServing, groupName string, targetRole workloadv1alpha1.Role, servingGroupOrdinal int, newRevision string) { // TODO: add podGroup update after gang scheduler finished + effectiveRevision := newRevision + if revision, ok := c.store.GetServingGroupRevision(utils.GetNamespaceName(ms), groupName); ok && revision != "" { + effectiveRevision = revision + } + // Get all replicas of a role from storage, for example, prefill-0, prefill-1... roleList, err := c.store.GetRoleList(utils.GetNamespaceName(ms), groupName, targetRole.Name) if err != nil { @@ -950,7 +959,7 @@ func (c *ModelServingController) manageRoleReplicas(ctx context.Context, ms *wor if len(pods) < expectedPods { klog.V(2).Infof("manageRoleReplicas: role %s/%s in ServingGroup %s is missing pods (%d/%d), recreating", targetRole.Name, roleObj.Name, groupName, len(pods), expectedPods) _, roleIndex := utils.GetParentNameAndOrdinal(roleObj.Name) - if err := c.CreatePodsByRole(ctx, *targetRole.DeepCopy(), ms, roleIndex, servingGroupOrdinal, newRevision); err != nil { + if err := c.CreatePodsByRole(ctx, *targetRole.DeepCopy(), ms, roleIndex, servingGroupOrdinal, effectiveRevision); err != nil { klog.Errorf("manageRoleReplicas: failed to recreate pods for role %s/%s in ServingGroup %s: %v", targetRole.Name, roleObj.Name, groupName, err) } } @@ -959,7 +968,7 @@ func (c *ModelServingController) manageRoleReplicas(ctx context.Context, ms *wor // Determine whether it is a scale-up or scale-down scenario if len(roleList) < expectedCount { klog.V(2).Infof("manageRoleReplicas: scaling UP role %s in ServingGroup %s: current=%d, expected=%d", targetRole.Name, groupName, len(roleList), expectedCount) - c.scaleUpRoles(ctx, ms, groupName, targetRole, roleList, expectedCount, servingGroupOrdinal, newRevision) + c.scaleUpRoles(ctx, ms, groupName, targetRole, roleList, expectedCount, servingGroupOrdinal, effectiveRevision) } else if len(roleList) > expectedCount { klog.V(2).Infof("manageRoleReplicas: scaling DOWN role %s in ServingGroup %s: current=%d, expected=%d", targetRole.Name, groupName, len(roleList), expectedCount) c.scaleDownRoles(ctx, ms, groupName, targetRole, roleList, expectedCount) diff --git a/pkg/model-serving-controller/controller/model_serving_controller_test.go b/pkg/model-serving-controller/controller/model_serving_controller_test.go index f0271141b..c943e6da7 100644 --- a/pkg/model-serving-controller/controller/model_serving_controller_test.go +++ b/pkg/model-serving-controller/controller/model_serving_controller_test.go @@ -2557,6 +2557,66 @@ func TestManageRoleReplicas(t *testing.T) { } } +func TestManageRoleReplicas_RecreateUsesServingGroupRevision(t *testing.T) { + kubeClient := kubefake.NewSimpleClientset() + kthenaClient := kthenafake.NewSimpleClientset() + volcanoClient := volcanofake.NewSimpleClientset() + apiextClient := apiextfake.NewSimpleClientset(testhelper.CreatePodGroupCRD()) + + controller, err := NewModelServingController(kubeClient, kthenaClient, volcanoClient, apiextClient) + assert.NoError(t, err) + + roleName := "default" + oldRevision := "revision-v1" + newRevision := "revision-v2" + + ms := &workloadv1alpha1.ModelServing{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "test-manage-role-group-rev", + UID: types.UID("ms-uid-group-rev"), + }, + Spec: workloadv1alpha1.ModelServingSpec{ + Replicas: ptr.To[int32](1), + Template: workloadv1alpha1.ServingGroup{ + Roles: []workloadv1alpha1.Role{ + { + Name: roleName, + Replicas: ptr.To[int32](1), + WorkerReplicas: 0, + EntryTemplate: workloadv1alpha1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: "entry-container", + Image: "test-image:latest", + }}, + }, + }, + }, + }, + }, + RecoveryPolicy: workloadv1alpha1.RoleRecreate, + }, + } + + groupName := utils.GenerateServingGroupName(ms.Name, 0) + controller.store.AddServingGroup(utils.GetNamespaceName(ms), 0, oldRevision) + controller.store.AddRole(utils.GetNamespaceName(ms), groupName, roleName, utils.GenerateRoleID(roleName, 0), oldRevision) + + controller.manageRoleReplicas(context.Background(), ms, groupName, ms.Spec.Template.Roles[0], 0, newRevision) + + selector := labels.SelectorFromSet(map[string]string{ + workloadv1alpha1.GroupNameLabelKey: groupName, + workloadv1alpha1.RoleLabelKey: roleName, + }) + pods, err := kubeClient.CoreV1().Pods(ms.Namespace).List(context.Background(), metav1.ListOptions{LabelSelector: selector.String()}) + assert.NoError(t, err) + if assert.Len(t, pods.Items, 1) { + assert.Equal(t, oldRevision, pods.Items[0].Labels[workloadv1alpha1.RevisionLabelKey], + "recreated pod for existing serving group should keep the serving group revision") + } +} + // TestScaleDownServingGroups tests the scaleDownServingGroups function with various scenarios func TestScaleDownServingGroups(t *testing.T) { tests := []struct { diff --git a/pkg/model-serving-controller/utils/controller_revision.go b/pkg/model-serving-controller/utils/controller_revision.go index 4362ad1f6..af6ffba4a 100644 --- a/pkg/model-serving-controller/utils/controller_revision.go +++ b/pkg/model-serving-controller/utils/controller_revision.go @@ -56,18 +56,10 @@ func CreateControllerRevision(ctx context.Context, client kubernetes.Interface, controllerRevisionName := GenerateControllerRevisionName(ms.Name, revision) existing, err := client.AppsV1().ControllerRevisions(ms.Namespace).Get(ctx, controllerRevisionName, metav1.GetOptions{}) if err == nil { - // If already exists, check if data has changed + // Existing revision snapshots are immutable. Keep the first payload for a + // given revision key to preserve deterministic rollback/recovery behavior. if string(existing.Data.Raw) != string(data) { - existing.Data = runtime.RawExtension{ - Raw: data, - } - existing.Revision++ - updated, updateErr := client.AppsV1().ControllerRevisions(ms.Namespace).Update(ctx, existing, metav1.UpdateOptions{}) - if updateErr != nil { - return nil, fmt.Errorf("failed to update ControllerRevision: %v", updateErr) - } - klog.V(4).Infof("Updated ControllerRevision %s/%s with revision %s", ms.Namespace, controllerRevisionName, revision) - return updated, nil + klog.Warningf("ControllerRevision %s/%s already exists with different payload for revision %s; preserving existing snapshot", ms.Namespace, controllerRevisionName, revision) } return existing, nil } else if !apierrors.IsNotFound(err) { From 94f4bc922ffa2467262d76168a95b74e265d2f94 Mon Sep 17 00:00:00 2001 From: katara-Jayprakash Date: Thu, 26 Mar 2026 14:41:54 +0530 Subject: [PATCH 2/6] ensure pod recreation/role scale-up in existing groups keeps group revision Signed-off-by: katara-Jayprakash --- .../utils/controller_revision_test.go | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/pkg/model-serving-controller/utils/controller_revision_test.go b/pkg/model-serving-controller/utils/controller_revision_test.go index ac6d9530c..a92a69274 100644 --- a/pkg/model-serving-controller/utils/controller_revision_test.go +++ b/pkg/model-serving-controller/utils/controller_revision_test.go @@ -21,6 +21,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" kubefake "k8s.io/client-go/kubernetes/fake" @@ -107,6 +108,78 @@ func TestGetControllerRevision(t *testing.T) { assert.Equal(t, "revision-v2", cr.Labels[ControllerRevisionRevisionLabelKey]) } +// TestCreateControllerRevision_ExistingRevisionShouldRemainImmutable verifies that +// creating a ControllerRevision with an existing revision key does not mutate +// previously stored template data. +func TestCreateControllerRevision_ExistingRevisionShouldRemainImmutable(t *testing.T) { + ctx := context.Background() + client := kubefake.NewSimpleClientset() + + roleReplicas := int32(1) + ms := &workloadv1alpha1.ModelServing{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-ms", + Namespace: "default", + UID: "test-uid", + }, + TypeMeta: metav1.TypeMeta{ + APIVersion: "workload.kthena.io/v1alpha1", + Kind: "ModelServing", + }, + } + + templateV1 := []workloadv1alpha1.Role{ + { + Name: "prefill", + Replicas: &roleReplicas, + EntryTemplate: workloadv1alpha1.PodTemplateSpec{ + Spec: corev1.PodSpec{}, + }, + }, + } + templateV1[0].EntryTemplate.Spec.Containers = []corev1.Container{{ + Name: "test-container", + Image: "nginx:1.28.2", + }} + + templateV2 := []workloadv1alpha1.Role{ + { + Name: "prefill", + Replicas: &roleReplicas, + EntryTemplate: workloadv1alpha1.PodTemplateSpec{ + Spec: corev1.PodSpec{}, + }, + }, + } + templateV2[0].EntryTemplate.Spec.Containers = []corev1.Container{{ + Name: "test-container", + Image: "nginx:alpine", + }} + + created, err := CreateControllerRevision(ctx, client, ms, "revision-v1", templateV1) + assert.NoError(t, err) + assert.NotNil(t, created) + assert.Equal(t, int64(1), created.Revision) + + _, err = CreateControllerRevision(ctx, client, ms, "revision-v1", templateV2) + assert.NoError(t, err) + + stored, err := GetControllerRevision(ctx, client, ms, "revision-v1") + assert.NoError(t, err) + assert.NotNil(t, stored) + + recovered, err := GetRolesFromControllerRevision(stored) + assert.NoError(t, err) + if assert.Len(t, recovered, 1) { + if assert.Len(t, recovered[0].EntryTemplate.Spec.Containers, 1) { + assert.Equal(t, "nginx:1.28.2", recovered[0].EntryTemplate.Spec.Containers[0].Image, + "existing revision payload should not be overwritten") + } + } + assert.Equal(t, int64(1), stored.Revision, + "existing revision object should remain unchanged for same revision key") +} + // TestCleanupOldControllerRevisions_PreservesCurrentAndUpdateRevisions tests that // CleanupOldControllerRevisions always preserves CurrentRevision and UpdateRevision func TestCleanupOldControllerRevisions_PreservesCurrentAndUpdateRevisions(t *testing.T) { From 3cb3dbbd3e7dc2db69001f7185d9aa44af8328cd Mon Sep 17 00:00:00 2001 From: katara-Jayprakash Date: Thu, 26 Mar 2026 15:11:31 +0530 Subject: [PATCH 3/6] removed the redundant effectiveRevision Signed-off-by: katara-Jayprakash --- .../controller/model_serving_controller.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pkg/model-serving-controller/controller/model_serving_controller.go b/pkg/model-serving-controller/controller/model_serving_controller.go index 61ef7729c..2c7d5b055 100644 --- a/pkg/model-serving-controller/controller/model_serving_controller.go +++ b/pkg/model-serving-controller/controller/model_serving_controller.go @@ -789,10 +789,6 @@ func (c *ModelServingController) manageRole(ctx context.Context, ms *workloadv1a // Deleting ServingGroup will be recreated after the deletion is complete, so there is no need to scale the roles continue } - effectiveRevision := newRevision - if servingGroup.Revision != "" { - effectiveRevision = servingGroup.Revision - } _, servingGroupOrdinal := utils.GetParentNameAndOrdinal(servingGroup.Name) isPartitionProtected := partition > 0 && index < partition From a7cb67b48bda7a17b6f518bc1af0f5d80753b64f Mon Sep 17 00:00:00 2001 From: katara-Jayprakash Date: Thu, 26 Mar 2026 15:14:41 +0530 Subject: [PATCH 4/6] ControllerRevision payload mismatch log to error Signed-off-by: katara-Jayprakash --- pkg/model-serving-controller/utils/controller_revision.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/model-serving-controller/utils/controller_revision.go b/pkg/model-serving-controller/utils/controller_revision.go index af6ffba4a..b5e204ec3 100644 --- a/pkg/model-serving-controller/utils/controller_revision.go +++ b/pkg/model-serving-controller/utils/controller_revision.go @@ -59,7 +59,7 @@ func CreateControllerRevision(ctx context.Context, client kubernetes.Interface, // Existing revision snapshots are immutable. Keep the first payload for a // given revision key to preserve deterministic rollback/recovery behavior. if string(existing.Data.Raw) != string(data) { - klog.Warningf("ControllerRevision %s/%s already exists with different payload for revision %s; preserving existing snapshot", ms.Namespace, controllerRevisionName, revision) + klog.Errorf("ControllerRevision %s/%s already exists with different payload for revision %s; preserving existing snapshot", ms.Namespace, controllerRevisionName, revision) } return existing, nil } else if !apierrors.IsNotFound(err) { From 943c22d14706ff7d029b76f15778ddb7e8d36487 Mon Sep 17 00:00:00 2001 From: katara-Jayprakash Date: Thu, 26 Mar 2026 15:26:54 +0530 Subject: [PATCH 5/6] change the parameter naming convention to something like effectiveRevision Signed-off-by: katara-Jayprakash --- .../controller/model_serving_controller.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/model-serving-controller/controller/model_serving_controller.go b/pkg/model-serving-controller/controller/model_serving_controller.go index 2c7d5b055..a2771f72d 100644 --- a/pkg/model-serving-controller/controller/model_serving_controller.go +++ b/pkg/model-serving-controller/controller/model_serving_controller.go @@ -875,7 +875,7 @@ func (c *ModelServingController) scaleDownRoles(ctx context.Context, ms *workloa // scaleUpRoles handles Role scaling up. // It creates new Roles with increasing indices starting from the current max index + 1. -func (c *ModelServingController) scaleUpRoles(ctx context.Context, ms *workloadv1alpha1.ModelServing, groupName string, targetRole workloadv1alpha1.Role, roleList []datastore.Role, expectedCount int, servingGroupOrdinal int, newRevision string) { +func (c *ModelServingController) scaleUpRoles(ctx context.Context, ms *workloadv1alpha1.ModelServing, groupName string, targetRole workloadv1alpha1.Role, roleList []datastore.Role, expectedCount int, servingGroupOrdinal int, effectiveRevision string) { startingIndex := 0 if len(roleList) > 0 { _, ordinal := utils.GetParentNameAndOrdinal(roleList[len(roleList)-1].Name) @@ -899,13 +899,13 @@ func (c *ModelServingController) scaleUpRoles(ctx context.Context, ms *workloadv for i := 0; i < toCreate; i++ { newIndex := startingIndex + i // Create pods for role - err := c.CreatePodsByRole(ctx, *targetRole.DeepCopy(), ms, newIndex, servingGroupOrdinal, newRevision) + err := c.CreatePodsByRole(ctx, *targetRole.DeepCopy(), ms, newIndex, servingGroupOrdinal, effectiveRevision) if err != nil { klog.Errorf("create role %s for ServingGroup %s failed: %v", utils.GenerateRoleID(targetRole.Name, newIndex), groupName, err) } else { // Insert new Role to global storage roleID := utils.GenerateRoleID(targetRole.Name, newIndex) - c.store.AddRole(utils.GetNamespaceName(ms), groupName, targetRole.Name, roleID, newRevision) + c.store.AddRole(utils.GetNamespaceName(ms), groupName, targetRole.Name, roleID, effectiveRevision) // Emit event for new role entering Creating state message := fmt.Sprintf("Role %s/%s in ServingGroup %s is now Creating", targetRole.Name, roleID, groupName) c.emitRoleStatusEvent(ms, corev1.EventTypeNormal, "RoleCreating", message) From 61381d61e74be8b5ecf4d020b20f7b44adb2f0d8 Mon Sep 17 00:00:00 2001 From: katara-Jayprakash Date: Fri, 3 Apr 2026 23:38:41 +0530 Subject: [PATCH 6/6] Fixing the e2e test accroding to review Signed-off-by: katara-Jayprakash --- .../controller/model_serving_controller.go | 17 ++-- .../model_serving_controller_test.go | 85 +++++++++++++++++++ 2 files changed, 94 insertions(+), 8 deletions(-) diff --git a/pkg/model-serving-controller/controller/model_serving_controller.go b/pkg/model-serving-controller/controller/model_serving_controller.go index a2771f72d..b98d8c5a5 100644 --- a/pkg/model-serving-controller/controller/model_serving_controller.go +++ b/pkg/model-serving-controller/controller/model_serving_controller.go @@ -875,7 +875,7 @@ func (c *ModelServingController) scaleDownRoles(ctx context.Context, ms *workloa // scaleUpRoles handles Role scaling up. // It creates new Roles with increasing indices starting from the current max index + 1. -func (c *ModelServingController) scaleUpRoles(ctx context.Context, ms *workloadv1alpha1.ModelServing, groupName string, targetRole workloadv1alpha1.Role, roleList []datastore.Role, expectedCount int, servingGroupOrdinal int, effectiveRevision string) { +func (c *ModelServingController) scaleUpRoles(ctx context.Context, ms *workloadv1alpha1.ModelServing, groupName string, targetRole workloadv1alpha1.Role, roleList []datastore.Role, expectedCount int, servingGroupOrdinal int, revision string) { startingIndex := 0 if len(roleList) > 0 { _, ordinal := utils.GetParentNameAndOrdinal(roleList[len(roleList)-1].Name) @@ -899,13 +899,13 @@ func (c *ModelServingController) scaleUpRoles(ctx context.Context, ms *workloadv for i := 0; i < toCreate; i++ { newIndex := startingIndex + i // Create pods for role - err := c.CreatePodsByRole(ctx, *targetRole.DeepCopy(), ms, newIndex, servingGroupOrdinal, effectiveRevision) + err := c.CreatePodsByRole(ctx, *targetRole.DeepCopy(), ms, newIndex, servingGroupOrdinal, revision) if err != nil { klog.Errorf("create role %s for ServingGroup %s failed: %v", utils.GenerateRoleID(targetRole.Name, newIndex), groupName, err) } else { // Insert new Role to global storage roleID := utils.GenerateRoleID(targetRole.Name, newIndex) - c.store.AddRole(utils.GetNamespaceName(ms), groupName, targetRole.Name, roleID, effectiveRevision) + c.store.AddRole(utils.GetNamespaceName(ms), groupName, targetRole.Name, roleID, revision) // Emit event for new role entering Creating state message := fmt.Sprintf("Role %s/%s in ServingGroup %s is now Creating", targetRole.Name, roleID, groupName) c.emitRoleStatusEvent(ms, corev1.EventTypeNormal, "RoleCreating", message) @@ -917,9 +917,10 @@ func (c *ModelServingController) scaleUpRoles(ctx context.Context, ms *workloadv // It handles both scale up and scale down operations for the role func (c *ModelServingController) manageRoleReplicas(ctx context.Context, ms *workloadv1alpha1.ModelServing, groupName string, targetRole workloadv1alpha1.Role, servingGroupOrdinal int, newRevision string) { // TODO: add podGroup update after gang scheduler finished - effectiveRevision := newRevision - if revision, ok := c.store.GetServingGroupRevision(utils.GetNamespaceName(ms), groupName); ok && revision != "" { - effectiveRevision = revision + // Use the stored revision for existing groups (partition-protected), otherwise use newRevision + revision := newRevision + if storedRevision, ok := c.store.GetServingGroupRevision(utils.GetNamespaceName(ms), groupName); ok && storedRevision != "" { + revision = storedRevision } // Get all replicas of a role from storage, for example, prefill-0, prefill-1... @@ -955,7 +956,7 @@ func (c *ModelServingController) manageRoleReplicas(ctx context.Context, ms *wor if len(pods) < expectedPods { klog.V(2).Infof("manageRoleReplicas: role %s/%s in ServingGroup %s is missing pods (%d/%d), recreating", targetRole.Name, roleObj.Name, groupName, len(pods), expectedPods) _, roleIndex := utils.GetParentNameAndOrdinal(roleObj.Name) - if err := c.CreatePodsByRole(ctx, *targetRole.DeepCopy(), ms, roleIndex, servingGroupOrdinal, effectiveRevision); err != nil { + if err := c.CreatePodsByRole(ctx, *targetRole.DeepCopy(), ms, roleIndex, servingGroupOrdinal, revision); err != nil { klog.Errorf("manageRoleReplicas: failed to recreate pods for role %s/%s in ServingGroup %s: %v", targetRole.Name, roleObj.Name, groupName, err) } } @@ -964,7 +965,7 @@ func (c *ModelServingController) manageRoleReplicas(ctx context.Context, ms *wor // Determine whether it is a scale-up or scale-down scenario if len(roleList) < expectedCount { klog.V(2).Infof("manageRoleReplicas: scaling UP role %s in ServingGroup %s: current=%d, expected=%d", targetRole.Name, groupName, len(roleList), expectedCount) - c.scaleUpRoles(ctx, ms, groupName, targetRole, roleList, expectedCount, servingGroupOrdinal, effectiveRevision) + c.scaleUpRoles(ctx, ms, groupName, targetRole, roleList, expectedCount, servingGroupOrdinal, revision) } else if len(roleList) > expectedCount { klog.V(2).Infof("manageRoleReplicas: scaling DOWN role %s in ServingGroup %s: current=%d, expected=%d", targetRole.Name, groupName, len(roleList), expectedCount) c.scaleDownRoles(ctx, ms, groupName, targetRole, roleList, expectedCount) diff --git a/pkg/model-serving-controller/controller/model_serving_controller_test.go b/pkg/model-serving-controller/controller/model_serving_controller_test.go index c943e6da7..59f31e24c 100644 --- a/pkg/model-serving-controller/controller/model_serving_controller_test.go +++ b/pkg/model-serving-controller/controller/model_serving_controller_test.go @@ -2617,6 +2617,91 @@ func TestManageRoleReplicas_RecreateUsesServingGroupRevision(t *testing.T) { } } +// TestManageRoleReplicas_PartitionedScaleUp verifies that during a partitioned rolling update, +// group 0 (protected by partition=1) uses old revision while group 1 uses new revision. +func TestManageRoleReplicas_PartitionedScaleUp(t *testing.T) { + kubeClient := kubefake.NewSimpleClientset() + kthenaClient := kthenafake.NewSimpleClientset() + volcanoClient := volcanofake.NewSimpleClientset() + apiextClient := apiextfake.NewSimpleClientset(testhelper.CreatePodGroupCRD()) + + controller, err := NewModelServingController(kubeClient, kthenaClient, volcanoClient, apiextClient) + assert.NoError(t, err) + + roleName := "default" + oldRevision := "revision-v1" + newRevision := "revision-v2" + + ms := &workloadv1alpha1.ModelServing{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "test-partition-scale", + UID: types.UID("ms-uid-partition"), + }, + Spec: workloadv1alpha1.ModelServingSpec{ + Replicas: ptr.To[int32](2), + Template: workloadv1alpha1.ServingGroup{ + Roles: []workloadv1alpha1.Role{ + { + Name: roleName, + Replicas: ptr.To[int32](1), + WorkerReplicas: 0, + EntryTemplate: workloadv1alpha1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: "entry-container", + Image: "test-image:latest", + }}, + }, + }, + }, + }, + }, + RecoveryPolicy: workloadv1alpha1.RoleRecreate, + }, + } + + // Setup: group 0 exists with old revision (protected by partition=1) + group0Name := utils.GenerateServingGroupName(ms.Name, 0) + controller.store.AddServingGroup(utils.GetNamespaceName(ms), 0, oldRevision) + controller.store.AddRole(utils.GetNamespaceName(ms), group0Name, roleName, utils.GenerateRoleID(roleName, 0), oldRevision) + + // Setup: group 1 exists with new revision (above partition, should use new revision) + group1Name := utils.GenerateServingGroupName(ms.Name, 1) + controller.store.AddServingGroup(utils.GetNamespaceName(ms), 1, newRevision) + controller.store.AddRole(utils.GetNamespaceName(ms), group1Name, roleName, utils.GenerateRoleID(roleName, 0), newRevision) + + // Scale up roles in group 0 - should use old revision (partition-protected) + controller.manageRoleReplicas(context.Background(), ms, group0Name, ms.Spec.Template.Roles[0], 0, newRevision) + + // Scale up roles in group 1 - should use new revision + controller.manageRoleReplicas(context.Background(), ms, group1Name, ms.Spec.Template.Roles[0], 1, newRevision) + + // Verify group 0 pods use old revision + selector0 := labels.SelectorFromSet(map[string]string{ + workloadv1alpha1.GroupNameLabelKey: group0Name, + workloadv1alpha1.RoleLabelKey: roleName, + }) + pods0, err := kubeClient.CoreV1().Pods(ms.Namespace).List(context.Background(), metav1.ListOptions{LabelSelector: selector0.String()}) + assert.NoError(t, err) + if assert.Len(t, pods0.Items, 1) { + assert.Equal(t, oldRevision, pods0.Items[0].Labels[workloadv1alpha1.RevisionLabelKey], + "group 0 (partition-protected) should use old revision") + } + + // Verify group 1 pods use new revision + selector1 := labels.SelectorFromSet(map[string]string{ + workloadv1alpha1.GroupNameLabelKey: group1Name, + workloadv1alpha1.RoleLabelKey: roleName, + }) + pods1, err := kubeClient.CoreV1().Pods(ms.Namespace).List(context.Background(), metav1.ListOptions{LabelSelector: selector1.String()}) + assert.NoError(t, err) + if assert.Len(t, pods1.Items, 1) { + assert.Equal(t, newRevision, pods1.Items[0].Labels[workloadv1alpha1.RevisionLabelKey], + "group 1 (above partition) should use new revision") + } +} + // TestScaleDownServingGroups tests the scaleDownServingGroups function with various scenarios func TestScaleDownServingGroups(t *testing.T) { tests := []struct {