Skip to content

Commit 61381d6

Browse files
Fixing the e2e test accroding to review
Signed-off-by: katara-Jayprakash <[email protected]>
1 parent 943c22d commit 61381d6

2 files changed

Lines changed: 94 additions & 8 deletions

File tree

pkg/model-serving-controller/controller/model_serving_controller.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -875,7 +875,7 @@ func (c *ModelServingController) scaleDownRoles(ctx context.Context, ms *workloa
875875

876876
// scaleUpRoles handles Role scaling up.
877877
// It creates new Roles with increasing indices starting from the current max index + 1.
878-
func (c *ModelServingController) scaleUpRoles(ctx context.Context, ms *workloadv1alpha1.ModelServing, groupName string, targetRole workloadv1alpha1.Role, roleList []datastore.Role, expectedCount int, servingGroupOrdinal int, effectiveRevision string) {
878+
func (c *ModelServingController) scaleUpRoles(ctx context.Context, ms *workloadv1alpha1.ModelServing, groupName string, targetRole workloadv1alpha1.Role, roleList []datastore.Role, expectedCount int, servingGroupOrdinal int, revision string) {
879879
startingIndex := 0
880880
if len(roleList) > 0 {
881881
_, ordinal := utils.GetParentNameAndOrdinal(roleList[len(roleList)-1].Name)
@@ -899,13 +899,13 @@ func (c *ModelServingController) scaleUpRoles(ctx context.Context, ms *workloadv
899899
for i := 0; i < toCreate; i++ {
900900
newIndex := startingIndex + i
901901
// Create pods for role
902-
err := c.CreatePodsByRole(ctx, *targetRole.DeepCopy(), ms, newIndex, servingGroupOrdinal, effectiveRevision)
902+
err := c.CreatePodsByRole(ctx, *targetRole.DeepCopy(), ms, newIndex, servingGroupOrdinal, revision)
903903
if err != nil {
904904
klog.Errorf("create role %s for ServingGroup %s failed: %v", utils.GenerateRoleID(targetRole.Name, newIndex), groupName, err)
905905
} else {
906906
// Insert new Role to global storage
907907
roleID := utils.GenerateRoleID(targetRole.Name, newIndex)
908-
c.store.AddRole(utils.GetNamespaceName(ms), groupName, targetRole.Name, roleID, effectiveRevision)
908+
c.store.AddRole(utils.GetNamespaceName(ms), groupName, targetRole.Name, roleID, revision)
909909
// Emit event for new role entering Creating state
910910
message := fmt.Sprintf("Role %s/%s in ServingGroup %s is now Creating", targetRole.Name, roleID, groupName)
911911
c.emitRoleStatusEvent(ms, corev1.EventTypeNormal, "RoleCreating", message)
@@ -917,9 +917,10 @@ func (c *ModelServingController) scaleUpRoles(ctx context.Context, ms *workloadv
917917
// It handles both scale up and scale down operations for the role
918918
func (c *ModelServingController) manageRoleReplicas(ctx context.Context, ms *workloadv1alpha1.ModelServing, groupName string, targetRole workloadv1alpha1.Role, servingGroupOrdinal int, newRevision string) {
919919
// TODO: add podGroup update after gang scheduler finished
920-
effectiveRevision := newRevision
921-
if revision, ok := c.store.GetServingGroupRevision(utils.GetNamespaceName(ms), groupName); ok && revision != "" {
922-
effectiveRevision = revision
920+
// Use the stored revision for existing groups (partition-protected), otherwise use newRevision
921+
revision := newRevision
922+
if storedRevision, ok := c.store.GetServingGroupRevision(utils.GetNamespaceName(ms), groupName); ok && storedRevision != "" {
923+
revision = storedRevision
923924
}
924925

925926
// Get all replicas of a role from storage, for example, prefill-0, prefill-1...
@@ -955,7 +956,7 @@ func (c *ModelServingController) manageRoleReplicas(ctx context.Context, ms *wor
955956
if len(pods) < expectedPods {
956957
klog.V(2).Infof("manageRoleReplicas: role %s/%s in ServingGroup %s is missing pods (%d/%d), recreating", targetRole.Name, roleObj.Name, groupName, len(pods), expectedPods)
957958
_, roleIndex := utils.GetParentNameAndOrdinal(roleObj.Name)
958-
if err := c.CreatePodsByRole(ctx, *targetRole.DeepCopy(), ms, roleIndex, servingGroupOrdinal, effectiveRevision); err != nil {
959+
if err := c.CreatePodsByRole(ctx, *targetRole.DeepCopy(), ms, roleIndex, servingGroupOrdinal, revision); err != nil {
959960
klog.Errorf("manageRoleReplicas: failed to recreate pods for role %s/%s in ServingGroup %s: %v", targetRole.Name, roleObj.Name, groupName, err)
960961
}
961962
}
@@ -964,7 +965,7 @@ func (c *ModelServingController) manageRoleReplicas(ctx context.Context, ms *wor
964965
// Determine whether it is a scale-up or scale-down scenario
965966
if len(roleList) < expectedCount {
966967
klog.V(2).Infof("manageRoleReplicas: scaling UP role %s in ServingGroup %s: current=%d, expected=%d", targetRole.Name, groupName, len(roleList), expectedCount)
967-
c.scaleUpRoles(ctx, ms, groupName, targetRole, roleList, expectedCount, servingGroupOrdinal, effectiveRevision)
968+
c.scaleUpRoles(ctx, ms, groupName, targetRole, roleList, expectedCount, servingGroupOrdinal, revision)
968969
} else if len(roleList) > expectedCount {
969970
klog.V(2).Infof("manageRoleReplicas: scaling DOWN role %s in ServingGroup %s: current=%d, expected=%d", targetRole.Name, groupName, len(roleList), expectedCount)
970971
c.scaleDownRoles(ctx, ms, groupName, targetRole, roleList, expectedCount)

pkg/model-serving-controller/controller/model_serving_controller_test.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2617,6 +2617,91 @@ func TestManageRoleReplicas_RecreateUsesServingGroupRevision(t *testing.T) {
26172617
}
26182618
}
26192619

2620+
// TestManageRoleReplicas_PartitionedScaleUp verifies that during a partitioned rolling update,
2621+
// group 0 (protected by partition=1) uses old revision while group 1 uses new revision.
2622+
func TestManageRoleReplicas_PartitionedScaleUp(t *testing.T) {
2623+
kubeClient := kubefake.NewSimpleClientset()
2624+
kthenaClient := kthenafake.NewSimpleClientset()
2625+
volcanoClient := volcanofake.NewSimpleClientset()
2626+
apiextClient := apiextfake.NewSimpleClientset(testhelper.CreatePodGroupCRD())
2627+
2628+
controller, err := NewModelServingController(kubeClient, kthenaClient, volcanoClient, apiextClient)
2629+
assert.NoError(t, err)
2630+
2631+
roleName := "default"
2632+
oldRevision := "revision-v1"
2633+
newRevision := "revision-v2"
2634+
2635+
ms := &workloadv1alpha1.ModelServing{
2636+
ObjectMeta: metav1.ObjectMeta{
2637+
Namespace: "default",
2638+
Name: "test-partition-scale",
2639+
UID: types.UID("ms-uid-partition"),
2640+
},
2641+
Spec: workloadv1alpha1.ModelServingSpec{
2642+
Replicas: ptr.To[int32](2),
2643+
Template: workloadv1alpha1.ServingGroup{
2644+
Roles: []workloadv1alpha1.Role{
2645+
{
2646+
Name: roleName,
2647+
Replicas: ptr.To[int32](1),
2648+
WorkerReplicas: 0,
2649+
EntryTemplate: workloadv1alpha1.PodTemplateSpec{
2650+
Spec: corev1.PodSpec{
2651+
Containers: []corev1.Container{{
2652+
Name: "entry-container",
2653+
Image: "test-image:latest",
2654+
}},
2655+
},
2656+
},
2657+
},
2658+
},
2659+
},
2660+
RecoveryPolicy: workloadv1alpha1.RoleRecreate,
2661+
},
2662+
}
2663+
2664+
// Setup: group 0 exists with old revision (protected by partition=1)
2665+
group0Name := utils.GenerateServingGroupName(ms.Name, 0)
2666+
controller.store.AddServingGroup(utils.GetNamespaceName(ms), 0, oldRevision)
2667+
controller.store.AddRole(utils.GetNamespaceName(ms), group0Name, roleName, utils.GenerateRoleID(roleName, 0), oldRevision)
2668+
2669+
// Setup: group 1 exists with new revision (above partition, should use new revision)
2670+
group1Name := utils.GenerateServingGroupName(ms.Name, 1)
2671+
controller.store.AddServingGroup(utils.GetNamespaceName(ms), 1, newRevision)
2672+
controller.store.AddRole(utils.GetNamespaceName(ms), group1Name, roleName, utils.GenerateRoleID(roleName, 0), newRevision)
2673+
2674+
// Scale up roles in group 0 - should use old revision (partition-protected)
2675+
controller.manageRoleReplicas(context.Background(), ms, group0Name, ms.Spec.Template.Roles[0], 0, newRevision)
2676+
2677+
// Scale up roles in group 1 - should use new revision
2678+
controller.manageRoleReplicas(context.Background(), ms, group1Name, ms.Spec.Template.Roles[0], 1, newRevision)
2679+
2680+
// Verify group 0 pods use old revision
2681+
selector0 := labels.SelectorFromSet(map[string]string{
2682+
workloadv1alpha1.GroupNameLabelKey: group0Name,
2683+
workloadv1alpha1.RoleLabelKey: roleName,
2684+
})
2685+
pods0, err := kubeClient.CoreV1().Pods(ms.Namespace).List(context.Background(), metav1.ListOptions{LabelSelector: selector0.String()})
2686+
assert.NoError(t, err)
2687+
if assert.Len(t, pods0.Items, 1) {
2688+
assert.Equal(t, oldRevision, pods0.Items[0].Labels[workloadv1alpha1.RevisionLabelKey],
2689+
"group 0 (partition-protected) should use old revision")
2690+
}
2691+
2692+
// Verify group 1 pods use new revision
2693+
selector1 := labels.SelectorFromSet(map[string]string{
2694+
workloadv1alpha1.GroupNameLabelKey: group1Name,
2695+
workloadv1alpha1.RoleLabelKey: roleName,
2696+
})
2697+
pods1, err := kubeClient.CoreV1().Pods(ms.Namespace).List(context.Background(), metav1.ListOptions{LabelSelector: selector1.String()})
2698+
assert.NoError(t, err)
2699+
if assert.Len(t, pods1.Items, 1) {
2700+
assert.Equal(t, newRevision, pods1.Items[0].Labels[workloadv1alpha1.RevisionLabelKey],
2701+
"group 1 (above partition) should use new revision")
2702+
}
2703+
}
2704+
26202705
// TestScaleDownServingGroups tests the scaleDownServingGroups function with various scenarios
26212706
func TestScaleDownServingGroups(t *testing.T) {
26222707
tests := []struct {

0 commit comments

Comments
 (0)