Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,7 @@ func (c *ModelServingController) scaleDownRoles(ctx context.Context, ms *workloa

// scaleUpRoles handles Role scaling up.
// It creates new Roles with increasing indices starting from the current max index + 1.
func (c *ModelServingController) scaleUpRoles(ctx context.Context, ms *workloadv1alpha1.ModelServing, groupName string, targetRole workloadv1alpha1.Role, roleList []datastore.Role, expectedCount int, servingGroupOrdinal int, newRevision string) {
func (c *ModelServingController) scaleUpRoles(ctx context.Context, ms *workloadv1alpha1.ModelServing, groupName string, targetRole workloadv1alpha1.Role, roleList []datastore.Role, expectedCount int, servingGroupOrdinal int, revision string) {
startingIndex := 0
if len(roleList) > 0 {
_, ordinal := utils.GetParentNameAndOrdinal(roleList[len(roleList)-1].Name)
Expand All @@ -899,13 +899,13 @@ func (c *ModelServingController) scaleUpRoles(ctx context.Context, ms *workloadv
for i := 0; i < toCreate; i++ {
newIndex := startingIndex + i
// Create pods for role
err := c.CreatePodsByRole(ctx, *targetRole.DeepCopy(), ms, newIndex, servingGroupOrdinal, newRevision)
err := c.CreatePodsByRole(ctx, *targetRole.DeepCopy(), ms, newIndex, servingGroupOrdinal, revision)
if err != nil {
klog.Errorf("create role %s for ServingGroup %s failed: %v", utils.GenerateRoleID(targetRole.Name, newIndex), groupName, err)
} else {
// Insert new Role to global storage
roleID := utils.GenerateRoleID(targetRole.Name, newIndex)
c.store.AddRole(utils.GetNamespaceName(ms), groupName, targetRole.Name, roleID, newRevision)
c.store.AddRole(utils.GetNamespaceName(ms), groupName, targetRole.Name, roleID, revision)
// Emit event for new role entering Creating state
message := fmt.Sprintf("Role %s/%s in ServingGroup %s is now Creating", targetRole.Name, roleID, groupName)
c.emitRoleStatusEvent(ms, corev1.EventTypeNormal, "RoleCreating", message)
Expand All @@ -917,6 +917,12 @@ func (c *ModelServingController) scaleUpRoles(ctx context.Context, ms *workloadv
// It handles both scale up and scale down operations for the role
func (c *ModelServingController) manageRoleReplicas(ctx context.Context, ms *workloadv1alpha1.ModelServing, groupName string, targetRole workloadv1alpha1.Role, servingGroupOrdinal int, newRevision string) {
// TODO: add podGroup update after gang scheduler finished
// Use the stored revision for existing groups (partition-protected), otherwise use newRevision
revision := newRevision
if storedRevision, ok := c.store.GetServingGroupRevision(utils.GetNamespaceName(ms), groupName); ok && storedRevision != "" {
revision = storedRevision
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think rolling updates should rely on manageServingGroupRollingUpdate + current/update revision + partition in status; old revisions in the store are not the authoritative source for rolling updates.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the source is currentRevision or UpdateRevision, then once the partition has been set, currentRevision remains unchanged.

And use the revision information from ModelServing.Status. It is also not possible to determine which serving groups need to be upgraded.


// Get all replicas of a role from storage, for example, prefill-0, prefill-1...
roleList, err := c.store.GetRoleList(utils.GetNamespaceName(ms), groupName, targetRole.Name)
if err != nil {
Expand Down Expand Up @@ -950,7 +956,7 @@ func (c *ModelServingController) manageRoleReplicas(ctx context.Context, ms *wor
if len(pods) < expectedPods {
klog.V(2).Infof("manageRoleReplicas: role %s/%s in ServingGroup %s is missing pods (%d/%d), recreating", targetRole.Name, roleObj.Name, groupName, len(pods), expectedPods)
_, roleIndex := utils.GetParentNameAndOrdinal(roleObj.Name)
if err := c.CreatePodsByRole(ctx, *targetRole.DeepCopy(), ms, roleIndex, servingGroupOrdinal, newRevision); err != nil {
if err := c.CreatePodsByRole(ctx, *targetRole.DeepCopy(), ms, roleIndex, servingGroupOrdinal, revision); err != nil {
klog.Errorf("manageRoleReplicas: failed to recreate pods for role %s/%s in ServingGroup %s: %v", targetRole.Name, roleObj.Name, groupName, err)
}
}
Expand All @@ -959,7 +965,7 @@ func (c *ModelServingController) manageRoleReplicas(ctx context.Context, ms *wor
// Determine whether it is a scale-up or scale-down scenario
if len(roleList) < expectedCount {
klog.V(2).Infof("manageRoleReplicas: scaling UP role %s in ServingGroup %s: current=%d, expected=%d", targetRole.Name, groupName, len(roleList), expectedCount)
c.scaleUpRoles(ctx, ms, groupName, targetRole, roleList, expectedCount, servingGroupOrdinal, newRevision)
c.scaleUpRoles(ctx, ms, groupName, targetRole, roleList, expectedCount, servingGroupOrdinal, revision)
} else if len(roleList) > expectedCount {
klog.V(2).Infof("manageRoleReplicas: scaling DOWN role %s in ServingGroup %s: current=%d, expected=%d", targetRole.Name, groupName, len(roleList), expectedCount)
c.scaleDownRoles(ctx, ms, groupName, targetRole, roleList, expectedCount)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2557,6 +2557,151 @@ func TestManageRoleReplicas(t *testing.T) {
}
}

func TestManageRoleReplicas_RecreateUsesServingGroupRevision(t *testing.T) {
kubeClient := kubefake.NewSimpleClientset()
kthenaClient := kthenafake.NewSimpleClientset()
volcanoClient := volcanofake.NewSimpleClientset()
apiextClient := apiextfake.NewSimpleClientset(testhelper.CreatePodGroupCRD())

controller, err := NewModelServingController(kubeClient, kthenaClient, volcanoClient, apiextClient)
assert.NoError(t, err)

roleName := "default"
oldRevision := "revision-v1"
newRevision := "revision-v2"

ms := &workloadv1alpha1.ModelServing{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "test-manage-role-group-rev",
UID: types.UID("ms-uid-group-rev"),
},
Spec: workloadv1alpha1.ModelServingSpec{
Replicas: ptr.To[int32](1),
Template: workloadv1alpha1.ServingGroup{
Roles: []workloadv1alpha1.Role{
{
Name: roleName,
Replicas: ptr.To[int32](1),
WorkerReplicas: 0,
EntryTemplate: workloadv1alpha1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: "entry-container",
Image: "test-image:latest",
}},
},
},
},
},
},
RecoveryPolicy: workloadv1alpha1.RoleRecreate,
},
}

groupName := utils.GenerateServingGroupName(ms.Name, 0)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you please construct a real case, say test-manage-role-group-rev has two replicas, and partition = 1. So we can update the group 1 with the new revision, while group 0 still use the old revision

controller.store.AddServingGroup(utils.GetNamespaceName(ms), 0, oldRevision)
controller.store.AddRole(utils.GetNamespaceName(ms), groupName, roleName, utils.GenerateRoleID(roleName, 0), oldRevision)

controller.manageRoleReplicas(context.Background(), ms, groupName, ms.Spec.Template.Roles[0], 0, newRevision)

selector := labels.SelectorFromSet(map[string]string{
workloadv1alpha1.GroupNameLabelKey: groupName,
workloadv1alpha1.RoleLabelKey: roleName,
})
pods, err := kubeClient.CoreV1().Pods(ms.Namespace).List(context.Background(), metav1.ListOptions{LabelSelector: selector.String()})
assert.NoError(t, err)
if assert.Len(t, pods.Items, 1) {
assert.Equal(t, oldRevision, pods.Items[0].Labels[workloadv1alpha1.RevisionLabelKey],
"recreated pod for existing serving group should keep the serving group revision")
}
}

// TestManageRoleReplicas_PartitionedScaleUp verifies that during a partitioned rolling update,
// group 0 (protected by partition=1) uses old revision while group 1 uses new revision.
func TestManageRoleReplicas_PartitionedScaleUp(t *testing.T) {
kubeClient := kubefake.NewSimpleClientset()
kthenaClient := kthenafake.NewSimpleClientset()
volcanoClient := volcanofake.NewSimpleClientset()
apiextClient := apiextfake.NewSimpleClientset(testhelper.CreatePodGroupCRD())

controller, err := NewModelServingController(kubeClient, kthenaClient, volcanoClient, apiextClient)
assert.NoError(t, err)

roleName := "default"
oldRevision := "revision-v1"
newRevision := "revision-v2"

ms := &workloadv1alpha1.ModelServing{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "test-partition-scale",
UID: types.UID("ms-uid-partition"),
},
Spec: workloadv1alpha1.ModelServingSpec{
Replicas: ptr.To[int32](2),
Template: workloadv1alpha1.ServingGroup{
Roles: []workloadv1alpha1.Role{
{
Name: roleName,
Replicas: ptr.To[int32](1),
WorkerReplicas: 0,
EntryTemplate: workloadv1alpha1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: "entry-container",
Image: "test-image:latest",
}},
},
},
},
},
},
RecoveryPolicy: workloadv1alpha1.RoleRecreate,
},
}

// Setup: group 0 exists with old revision (protected by partition=1)
group0Name := utils.GenerateServingGroupName(ms.Name, 0)
controller.store.AddServingGroup(utils.GetNamespaceName(ms), 0, oldRevision)
controller.store.AddRole(utils.GetNamespaceName(ms), group0Name, roleName, utils.GenerateRoleID(roleName, 0), oldRevision)

// Setup: group 1 exists with new revision (above partition, should use new revision)
group1Name := utils.GenerateServingGroupName(ms.Name, 1)
controller.store.AddServingGroup(utils.GetNamespaceName(ms), 1, newRevision)
controller.store.AddRole(utils.GetNamespaceName(ms), group1Name, roleName, utils.GenerateRoleID(roleName, 0), newRevision)

// Scale up roles in group 0 - should use old revision (partition-protected)
controller.manageRoleReplicas(context.Background(), ms, group0Name, ms.Spec.Template.Roles[0], 0, newRevision)

// Scale up roles in group 1 - should use new revision
controller.manageRoleReplicas(context.Background(), ms, group1Name, ms.Spec.Template.Roles[0], 1, newRevision)

// Verify group 0 pods use old revision
selector0 := labels.SelectorFromSet(map[string]string{
workloadv1alpha1.GroupNameLabelKey: group0Name,
workloadv1alpha1.RoleLabelKey: roleName,
})
pods0, err := kubeClient.CoreV1().Pods(ms.Namespace).List(context.Background(), metav1.ListOptions{LabelSelector: selector0.String()})
assert.NoError(t, err)
if assert.Len(t, pods0.Items, 1) {
assert.Equal(t, oldRevision, pods0.Items[0].Labels[workloadv1alpha1.RevisionLabelKey],
"group 0 (partition-protected) should use old revision")
}

// Verify group 1 pods use new revision
selector1 := labels.SelectorFromSet(map[string]string{
workloadv1alpha1.GroupNameLabelKey: group1Name,
workloadv1alpha1.RoleLabelKey: roleName,
})
pods1, err := kubeClient.CoreV1().Pods(ms.Namespace).List(context.Background(), metav1.ListOptions{LabelSelector: selector1.String()})
assert.NoError(t, err)
if assert.Len(t, pods1.Items, 1) {
assert.Equal(t, newRevision, pods1.Items[0].Labels[workloadv1alpha1.RevisionLabelKey],
"group 1 (above partition) should use new revision")
}
}

// TestScaleDownServingGroups tests the scaleDownServingGroups function with various scenarios
func TestScaleDownServingGroups(t *testing.T) {
tests := []struct {
Expand Down
14 changes: 3 additions & 11 deletions pkg/model-serving-controller/utils/controller_revision.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,10 @@ func CreateControllerRevision(ctx context.Context, client kubernetes.Interface,
controllerRevisionName := GenerateControllerRevisionName(ms.Name, revision)
existing, err := client.AppsV1().ControllerRevisions(ms.Namespace).Get(ctx, controllerRevisionName, metav1.GetOptions{})
if err == nil {
// If already exists, check if data has changed
// Existing revision snapshots are immutable. Keep the first payload for a
// given revision key to preserve deterministic rollback/recovery behavior.
if string(existing.Data.Raw) != string(data) {
existing.Data = runtime.RawExtension{
Raw: data,
}
existing.Revision++
updated, updateErr := client.AppsV1().ControllerRevisions(ms.Namespace).Update(ctx, existing, metav1.UpdateOptions{})
if updateErr != nil {
return nil, fmt.Errorf("failed to update ControllerRevision: %v", updateErr)
}
klog.V(4).Infof("Updated ControllerRevision %s/%s with revision %s", ms.Namespace, controllerRevisionName, revision)
return updated, nil
klog.Errorf("ControllerRevision %s/%s already exists with different payload for revision %s; preserving existing snapshot", ms.Namespace, controllerRevisionName, revision)
}
return existing, nil
} else if !apierrors.IsNotFound(err) {
Expand Down
73 changes: 73 additions & 0 deletions pkg/model-serving-controller/utils/controller_revision_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
kubefake "k8s.io/client-go/kubernetes/fake"
Expand Down Expand Up @@ -107,6 +108,78 @@ func TestGetControllerRevision(t *testing.T) {
assert.Equal(t, "revision-v2", cr.Labels[ControllerRevisionRevisionLabelKey])
}

// TestCreateControllerRevision_ExistingRevisionShouldRemainImmutable verifies that
// creating a ControllerRevision with an existing revision key does not mutate
// previously stored template data.
func TestCreateControllerRevision_ExistingRevisionShouldRemainImmutable(t *testing.T) {
ctx := context.Background()
client := kubefake.NewSimpleClientset()

roleReplicas := int32(1)
ms := &workloadv1alpha1.ModelServing{
ObjectMeta: metav1.ObjectMeta{
Name: "test-ms",
Namespace: "default",
UID: "test-uid",
},
TypeMeta: metav1.TypeMeta{
APIVersion: "workload.kthena.io/v1alpha1",
Kind: "ModelServing",
},
}

templateV1 := []workloadv1alpha1.Role{
{
Name: "prefill",
Replicas: &roleReplicas,
EntryTemplate: workloadv1alpha1.PodTemplateSpec{
Spec: corev1.PodSpec{},
},
},
}
templateV1[0].EntryTemplate.Spec.Containers = []corev1.Container{{
Name: "test-container",
Image: "nginx:1.28.2",
}}

templateV2 := []workloadv1alpha1.Role{
{
Name: "prefill",
Replicas: &roleReplicas,
EntryTemplate: workloadv1alpha1.PodTemplateSpec{
Spec: corev1.PodSpec{},
},
},
}
templateV2[0].EntryTemplate.Spec.Containers = []corev1.Container{{
Name: "test-container",
Image: "nginx:alpine",
}}

created, err := CreateControllerRevision(ctx, client, ms, "revision-v1", templateV1)
assert.NoError(t, err)
assert.NotNil(t, created)
assert.Equal(t, int64(1), created.Revision)

_, err = CreateControllerRevision(ctx, client, ms, "revision-v1", templateV2)
assert.NoError(t, err)

stored, err := GetControllerRevision(ctx, client, ms, "revision-v1")
assert.NoError(t, err)
assert.NotNil(t, stored)

recovered, err := GetRolesFromControllerRevision(stored)
assert.NoError(t, err)
if assert.Len(t, recovered, 1) {
if assert.Len(t, recovered[0].EntryTemplate.Spec.Containers, 1) {
assert.Equal(t, "nginx:1.28.2", recovered[0].EntryTemplate.Spec.Containers[0].Image,
"existing revision payload should not be overwritten")
}
}
assert.Equal(t, int64(1), stored.Revision,
"existing revision object should remain unchanged for same revision key")
}

// TestCleanupOldControllerRevisions_PreservesCurrentAndUpdateRevisions tests that
// CleanupOldControllerRevisions always preserves CurrentRevision and UpdateRevision
func TestCleanupOldControllerRevisions_PreservesCurrentAndUpdateRevisions(t *testing.T) {
Expand Down
Loading