diff --git a/pkg/model-serving-controller/controller/model_serving_controller.go b/pkg/model-serving-controller/controller/model_serving_controller.go index 8bafd1a3b..92f393369 100644 --- a/pkg/model-serving-controller/controller/model_serving_controller.go +++ b/pkg/model-serving-controller/controller/model_serving_controller.go @@ -875,7 +875,7 @@ func (c *ModelServingController) scaleDownRoles(ctx context.Context, ms *workloa // scaleUpRoles handles Role scaling up. // It creates new Roles with increasing indices starting from the current max index + 1. -func (c *ModelServingController) scaleUpRoles(ctx context.Context, ms *workloadv1alpha1.ModelServing, groupName string, targetRole workloadv1alpha1.Role, roleList []datastore.Role, expectedCount int, servingGroupOrdinal int, newRevision string) { +func (c *ModelServingController) scaleUpRoles(ctx context.Context, ms *workloadv1alpha1.ModelServing, groupName string, targetRole workloadv1alpha1.Role, roleList []datastore.Role, expectedCount int, servingGroupOrdinal int, revision string) { startingIndex := 0 if len(roleList) > 0 { _, ordinal := utils.GetParentNameAndOrdinal(roleList[len(roleList)-1].Name) @@ -899,13 +899,13 @@ func (c *ModelServingController) scaleUpRoles(ctx context.Context, ms *workloadv for i := 0; i < toCreate; i++ { newIndex := startingIndex + i // Create pods for role - err := c.CreatePodsByRole(ctx, *targetRole.DeepCopy(), ms, newIndex, servingGroupOrdinal, newRevision) + err := c.CreatePodsByRole(ctx, *targetRole.DeepCopy(), ms, newIndex, servingGroupOrdinal, revision) if err != nil { klog.Errorf("create role %s for ServingGroup %s failed: %v", utils.GenerateRoleID(targetRole.Name, newIndex), groupName, err) } else { // Insert new Role to global storage roleID := utils.GenerateRoleID(targetRole.Name, newIndex) - c.store.AddRole(utils.GetNamespaceName(ms), groupName, targetRole.Name, roleID, newRevision) + c.store.AddRole(utils.GetNamespaceName(ms), groupName, targetRole.Name, roleID, revision) // Emit event for new role entering Creating state message := fmt.Sprintf("Role %s/%s in ServingGroup %s is now Creating", targetRole.Name, roleID, groupName) c.emitRoleStatusEvent(ms, corev1.EventTypeNormal, "RoleCreating", message) @@ -917,6 +917,12 @@ func (c *ModelServingController) scaleUpRoles(ctx context.Context, ms *workloadv // It handles both scale up and scale down operations for the role func (c *ModelServingController) manageRoleReplicas(ctx context.Context, ms *workloadv1alpha1.ModelServing, groupName string, targetRole workloadv1alpha1.Role, servingGroupOrdinal int, newRevision string) { // TODO: add podGroup update after gang scheduler finished + // Use the stored revision for existing groups (partition-protected), otherwise use newRevision + revision := newRevision + if storedRevision, ok := c.store.GetServingGroupRevision(utils.GetNamespaceName(ms), groupName); ok && storedRevision != "" { + revision = storedRevision + } + // Get all replicas of a role from storage, for example, prefill-0, prefill-1... roleList, err := c.store.GetRoleList(utils.GetNamespaceName(ms), groupName, targetRole.Name) if err != nil { @@ -950,7 +956,7 @@ func (c *ModelServingController) manageRoleReplicas(ctx context.Context, ms *wor if len(pods) < expectedPods { klog.V(2).Infof("manageRoleReplicas: role %s/%s in ServingGroup %s is missing pods (%d/%d), recreating", targetRole.Name, roleObj.Name, groupName, len(pods), expectedPods) _, roleIndex := utils.GetParentNameAndOrdinal(roleObj.Name) - if err := c.CreatePodsByRole(ctx, *targetRole.DeepCopy(), ms, roleIndex, servingGroupOrdinal, newRevision); err != nil { + if err := c.CreatePodsByRole(ctx, *targetRole.DeepCopy(), ms, roleIndex, servingGroupOrdinal, revision); err != nil { klog.Errorf("manageRoleReplicas: failed to recreate pods for role %s/%s in ServingGroup %s: %v", targetRole.Name, roleObj.Name, groupName, err) } } @@ -959,7 +965,7 @@ func (c *ModelServingController) manageRoleReplicas(ctx context.Context, ms *wor // Determine whether it is a scale-up or scale-down scenario if len(roleList) < expectedCount { klog.V(2).Infof("manageRoleReplicas: scaling UP role %s in ServingGroup %s: current=%d, expected=%d", targetRole.Name, groupName, len(roleList), expectedCount) - c.scaleUpRoles(ctx, ms, groupName, targetRole, roleList, expectedCount, servingGroupOrdinal, newRevision) + c.scaleUpRoles(ctx, ms, groupName, targetRole, roleList, expectedCount, servingGroupOrdinal, revision) } else if len(roleList) > expectedCount { klog.V(2).Infof("manageRoleReplicas: scaling DOWN role %s in ServingGroup %s: current=%d, expected=%d", targetRole.Name, groupName, len(roleList), expectedCount) c.scaleDownRoles(ctx, ms, groupName, targetRole, roleList, expectedCount) diff --git a/pkg/model-serving-controller/controller/model_serving_controller_test.go b/pkg/model-serving-controller/controller/model_serving_controller_test.go index f0271141b..59f31e24c 100644 --- a/pkg/model-serving-controller/controller/model_serving_controller_test.go +++ b/pkg/model-serving-controller/controller/model_serving_controller_test.go @@ -2557,6 +2557,151 @@ func TestManageRoleReplicas(t *testing.T) { } } +func TestManageRoleReplicas_RecreateUsesServingGroupRevision(t *testing.T) { + kubeClient := kubefake.NewSimpleClientset() + kthenaClient := kthenafake.NewSimpleClientset() + volcanoClient := volcanofake.NewSimpleClientset() + apiextClient := apiextfake.NewSimpleClientset(testhelper.CreatePodGroupCRD()) + + controller, err := NewModelServingController(kubeClient, kthenaClient, volcanoClient, apiextClient) + assert.NoError(t, err) + + roleName := "default" + oldRevision := "revision-v1" + newRevision := "revision-v2" + + ms := &workloadv1alpha1.ModelServing{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "test-manage-role-group-rev", + UID: types.UID("ms-uid-group-rev"), + }, + Spec: workloadv1alpha1.ModelServingSpec{ + Replicas: ptr.To[int32](1), + Template: workloadv1alpha1.ServingGroup{ + Roles: []workloadv1alpha1.Role{ + { + Name: roleName, + Replicas: ptr.To[int32](1), + WorkerReplicas: 0, + EntryTemplate: workloadv1alpha1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: "entry-container", + Image: "test-image:latest", + }}, + }, + }, + }, + }, + }, + RecoveryPolicy: workloadv1alpha1.RoleRecreate, + }, + } + + groupName := utils.GenerateServingGroupName(ms.Name, 0) + controller.store.AddServingGroup(utils.GetNamespaceName(ms), 0, oldRevision) + controller.store.AddRole(utils.GetNamespaceName(ms), groupName, roleName, utils.GenerateRoleID(roleName, 0), oldRevision) + + controller.manageRoleReplicas(context.Background(), ms, groupName, ms.Spec.Template.Roles[0], 0, newRevision) + + selector := labels.SelectorFromSet(map[string]string{ + workloadv1alpha1.GroupNameLabelKey: groupName, + workloadv1alpha1.RoleLabelKey: roleName, + }) + pods, err := kubeClient.CoreV1().Pods(ms.Namespace).List(context.Background(), metav1.ListOptions{LabelSelector: selector.String()}) + assert.NoError(t, err) + if assert.Len(t, pods.Items, 1) { + assert.Equal(t, oldRevision, pods.Items[0].Labels[workloadv1alpha1.RevisionLabelKey], + "recreated pod for existing serving group should keep the serving group revision") + } +} + +// TestManageRoleReplicas_PartitionedScaleUp verifies that during a partitioned rolling update, +// group 0 (protected by partition=1) uses old revision while group 1 uses new revision. +func TestManageRoleReplicas_PartitionedScaleUp(t *testing.T) { + kubeClient := kubefake.NewSimpleClientset() + kthenaClient := kthenafake.NewSimpleClientset() + volcanoClient := volcanofake.NewSimpleClientset() + apiextClient := apiextfake.NewSimpleClientset(testhelper.CreatePodGroupCRD()) + + controller, err := NewModelServingController(kubeClient, kthenaClient, volcanoClient, apiextClient) + assert.NoError(t, err) + + roleName := "default" + oldRevision := "revision-v1" + newRevision := "revision-v2" + + ms := &workloadv1alpha1.ModelServing{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "test-partition-scale", + UID: types.UID("ms-uid-partition"), + }, + Spec: workloadv1alpha1.ModelServingSpec{ + Replicas: ptr.To[int32](2), + Template: workloadv1alpha1.ServingGroup{ + Roles: []workloadv1alpha1.Role{ + { + Name: roleName, + Replicas: ptr.To[int32](1), + WorkerReplicas: 0, + EntryTemplate: workloadv1alpha1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: "entry-container", + Image: "test-image:latest", + }}, + }, + }, + }, + }, + }, + RecoveryPolicy: workloadv1alpha1.RoleRecreate, + }, + } + + // Setup: group 0 exists with old revision (protected by partition=1) + group0Name := utils.GenerateServingGroupName(ms.Name, 0) + controller.store.AddServingGroup(utils.GetNamespaceName(ms), 0, oldRevision) + controller.store.AddRole(utils.GetNamespaceName(ms), group0Name, roleName, utils.GenerateRoleID(roleName, 0), oldRevision) + + // Setup: group 1 exists with new revision (above partition, should use new revision) + group1Name := utils.GenerateServingGroupName(ms.Name, 1) + controller.store.AddServingGroup(utils.GetNamespaceName(ms), 1, newRevision) + controller.store.AddRole(utils.GetNamespaceName(ms), group1Name, roleName, utils.GenerateRoleID(roleName, 0), newRevision) + + // Scale up roles in group 0 - should use old revision (partition-protected) + controller.manageRoleReplicas(context.Background(), ms, group0Name, ms.Spec.Template.Roles[0], 0, newRevision) + + // Scale up roles in group 1 - should use new revision + controller.manageRoleReplicas(context.Background(), ms, group1Name, ms.Spec.Template.Roles[0], 1, newRevision) + + // Verify group 0 pods use old revision + selector0 := labels.SelectorFromSet(map[string]string{ + workloadv1alpha1.GroupNameLabelKey: group0Name, + workloadv1alpha1.RoleLabelKey: roleName, + }) + pods0, err := kubeClient.CoreV1().Pods(ms.Namespace).List(context.Background(), metav1.ListOptions{LabelSelector: selector0.String()}) + assert.NoError(t, err) + if assert.Len(t, pods0.Items, 1) { + assert.Equal(t, oldRevision, pods0.Items[0].Labels[workloadv1alpha1.RevisionLabelKey], + "group 0 (partition-protected) should use old revision") + } + + // Verify group 1 pods use new revision + selector1 := labels.SelectorFromSet(map[string]string{ + workloadv1alpha1.GroupNameLabelKey: group1Name, + workloadv1alpha1.RoleLabelKey: roleName, + }) + pods1, err := kubeClient.CoreV1().Pods(ms.Namespace).List(context.Background(), metav1.ListOptions{LabelSelector: selector1.String()}) + assert.NoError(t, err) + if assert.Len(t, pods1.Items, 1) { + assert.Equal(t, newRevision, pods1.Items[0].Labels[workloadv1alpha1.RevisionLabelKey], + "group 1 (above partition) should use new revision") + } +} + // TestScaleDownServingGroups tests the scaleDownServingGroups function with various scenarios func TestScaleDownServingGroups(t *testing.T) { tests := []struct { diff --git a/pkg/model-serving-controller/utils/controller_revision.go b/pkg/model-serving-controller/utils/controller_revision.go index 4362ad1f6..b5e204ec3 100644 --- a/pkg/model-serving-controller/utils/controller_revision.go +++ b/pkg/model-serving-controller/utils/controller_revision.go @@ -56,18 +56,10 @@ func CreateControllerRevision(ctx context.Context, client kubernetes.Interface, controllerRevisionName := GenerateControllerRevisionName(ms.Name, revision) existing, err := client.AppsV1().ControllerRevisions(ms.Namespace).Get(ctx, controllerRevisionName, metav1.GetOptions{}) if err == nil { - // If already exists, check if data has changed + // Existing revision snapshots are immutable. Keep the first payload for a + // given revision key to preserve deterministic rollback/recovery behavior. if string(existing.Data.Raw) != string(data) { - existing.Data = runtime.RawExtension{ - Raw: data, - } - existing.Revision++ - updated, updateErr := client.AppsV1().ControllerRevisions(ms.Namespace).Update(ctx, existing, metav1.UpdateOptions{}) - if updateErr != nil { - return nil, fmt.Errorf("failed to update ControllerRevision: %v", updateErr) - } - klog.V(4).Infof("Updated ControllerRevision %s/%s with revision %s", ms.Namespace, controllerRevisionName, revision) - return updated, nil + klog.Errorf("ControllerRevision %s/%s already exists with different payload for revision %s; preserving existing snapshot", ms.Namespace, controllerRevisionName, revision) } return existing, nil } else if !apierrors.IsNotFound(err) { diff --git a/pkg/model-serving-controller/utils/controller_revision_test.go b/pkg/model-serving-controller/utils/controller_revision_test.go index ac6d9530c..a92a69274 100644 --- a/pkg/model-serving-controller/utils/controller_revision_test.go +++ b/pkg/model-serving-controller/utils/controller_revision_test.go @@ -21,6 +21,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" kubefake "k8s.io/client-go/kubernetes/fake" @@ -107,6 +108,78 @@ func TestGetControllerRevision(t *testing.T) { assert.Equal(t, "revision-v2", cr.Labels[ControllerRevisionRevisionLabelKey]) } +// TestCreateControllerRevision_ExistingRevisionShouldRemainImmutable verifies that +// creating a ControllerRevision with an existing revision key does not mutate +// previously stored template data. +func TestCreateControllerRevision_ExistingRevisionShouldRemainImmutable(t *testing.T) { + ctx := context.Background() + client := kubefake.NewSimpleClientset() + + roleReplicas := int32(1) + ms := &workloadv1alpha1.ModelServing{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-ms", + Namespace: "default", + UID: "test-uid", + }, + TypeMeta: metav1.TypeMeta{ + APIVersion: "workload.kthena.io/v1alpha1", + Kind: "ModelServing", + }, + } + + templateV1 := []workloadv1alpha1.Role{ + { + Name: "prefill", + Replicas: &roleReplicas, + EntryTemplate: workloadv1alpha1.PodTemplateSpec{ + Spec: corev1.PodSpec{}, + }, + }, + } + templateV1[0].EntryTemplate.Spec.Containers = []corev1.Container{{ + Name: "test-container", + Image: "nginx:1.28.2", + }} + + templateV2 := []workloadv1alpha1.Role{ + { + Name: "prefill", + Replicas: &roleReplicas, + EntryTemplate: workloadv1alpha1.PodTemplateSpec{ + Spec: corev1.PodSpec{}, + }, + }, + } + templateV2[0].EntryTemplate.Spec.Containers = []corev1.Container{{ + Name: "test-container", + Image: "nginx:alpine", + }} + + created, err := CreateControllerRevision(ctx, client, ms, "revision-v1", templateV1) + assert.NoError(t, err) + assert.NotNil(t, created) + assert.Equal(t, int64(1), created.Revision) + + _, err = CreateControllerRevision(ctx, client, ms, "revision-v1", templateV2) + assert.NoError(t, err) + + stored, err := GetControllerRevision(ctx, client, ms, "revision-v1") + assert.NoError(t, err) + assert.NotNil(t, stored) + + recovered, err := GetRolesFromControllerRevision(stored) + assert.NoError(t, err) + if assert.Len(t, recovered, 1) { + if assert.Len(t, recovered[0].EntryTemplate.Spec.Containers, 1) { + assert.Equal(t, "nginx:1.28.2", recovered[0].EntryTemplate.Spec.Containers[0].Image, + "existing revision payload should not be overwritten") + } + } + assert.Equal(t, int64(1), stored.Revision, + "existing revision object should remain unchanged for same revision key") +} + // TestCleanupOldControllerRevisions_PreservesCurrentAndUpdateRevisions tests that // CleanupOldControllerRevisions always preserves CurrentRevision and UpdateRevision func TestCleanupOldControllerRevisions_PreservesCurrentAndUpdateRevisions(t *testing.T) {