Skip to content

Commit 189ef64

Browse files
feat: fix partition e2e
Signed-off-by: katara-Jayprakash <[email protected]>
1 parent 138da5a commit 189ef64

1 file changed

Lines changed: 140 additions & 68 deletions

File tree

test/e2e/controller-manager/model_serving_test.go

Lines changed: 140 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import (
2020
"context"
2121
"fmt"
2222
"math/rand"
23+
"strconv"
24+
"strings"
2325
"sync"
2426
"testing"
2527
"time"
@@ -1251,6 +1253,7 @@ func TestModelServingPartitionBoundaryProtection(t *testing.T) {
12511253
require.NoError(t, err)
12521254
initialRevision := initialMS.Status.CurrentRevision
12531255
t.Logf("Initial CurrentRevision: %s", initialMS.Status.CurrentRevision)
1256+
require.NotEmpty(t, initialRevision, "Initial CurrentRevision should be set")
12541257

12551258
updatedMS := initialMS.DeepCopy()
12561259
updatedMS.Spec.Template.Roles[0].EntryTemplate.Spec.Containers[0].Image = nginxAlpineImage
@@ -1260,21 +1263,8 @@ func TestModelServingPartitionBoundaryProtection(t *testing.T) {
12601263
require.NoError(t, err)
12611264
utils.WaitForModelServingReady(t, ctx, kthenaClient, testNamespace, modelServing.Name)
12621265

1263-
// Wait for partition state to converge (both status and actual pod images)
1264-
require.Eventually(t, func() bool {
1265-
ms, err := kthenaClient.WorkloadV1alpha1().ModelServings(testNamespace).Get(ctx, modelServing.Name, metav1.GetOptions{})
1266-
if err != nil {
1267-
return false
1268-
}
1269-
protectedCorrect, updatedCorrect := verifyPartitionState(t, ctx, kubeClient, modelServing.Name, partition, replicas)
1270-
t.Logf("CurrentRevision: %s, UpdateRevision: %s, Protected: %d/%d, Updated: %d/%d",
1271-
ms.Status.CurrentRevision, ms.Status.UpdateRevision, protectedCorrect, partition, updatedCorrect, replicas-partition)
1272-
return ms.Status.CurrentRevision == initialRevision &&
1273-
ms.Status.UpdateRevision != "" &&
1274-
ms.Status.UpdateRevision != initialRevision &&
1275-
protectedCorrect == int(partition) &&
1276-
updatedCorrect == int(replicas-partition)
1277-
}, 3*time.Minute, 2*time.Second, "Partition state did not converge")
1266+
updateRevision := waitForPartitionState(t, ctx, kthenaClient, kubeClient, modelServing.Name, partition, replicas, initialRevision)
1267+
assert.NotEqual(t, initialRevision, updateRevision)
12781268
}
12791269

12801270
// TestModelServingPartitionDeletedGroupHistoricalRevision verifies deleted groups
@@ -1294,7 +1284,9 @@ func TestModelServingPartitionDeletedGroupHistoricalRevision(t *testing.T) {
12941284

12951285
initialMS, err := kthenaClient.WorkloadV1alpha1().ModelServings(testNamespace).Get(ctx, modelServing.Name, metav1.GetOptions{})
12961286
require.NoError(t, err)
1297-
t.Logf("Initial CurrentRevision: %s", initialMS.Status.CurrentRevision)
1287+
initialRevision := initialMS.Status.CurrentRevision
1288+
t.Logf("Initial CurrentRevision: %s", initialRevision)
1289+
require.NotEmpty(t, initialRevision, "Initial CurrentRevision should be set")
12981290

12991291
updatedMS := initialMS.DeepCopy()
13001292
updatedMS.Spec.Template.Roles[0].EntryTemplate.Spec.Containers[0].Image = nginxAlpineImage
@@ -1304,12 +1296,7 @@ func TestModelServingPartitionDeletedGroupHistoricalRevision(t *testing.T) {
13041296
require.NoError(t, err)
13051297
utils.WaitForModelServingReady(t, ctx, kthenaClient, testNamespace, modelServing.Name)
13061298

1307-
// Wait for partition state to converge
1308-
require.Eventually(t, func() bool {
1309-
protectedCorrect, updatedCorrect := verifyPartitionState(t, ctx, kubeClient, modelServing.Name, partition, replicas)
1310-
t.Logf("Protected: %d/%d, Updated: %d/%d", protectedCorrect, partition, updatedCorrect, replicas-partition)
1311-
return protectedCorrect == int(partition) && updatedCorrect == int(replicas-partition)
1312-
}, 3*time.Minute, 2*time.Second, "Partition state did not converge")
1299+
updateRevision := waitForPartitionState(t, ctx, kthenaClient, kubeClient, modelServing.Name, partition, replicas, initialRevision)
13131300
t.Log("Partitioned update established")
13141301

13151302
targetOrdinal := 1
@@ -1323,7 +1310,7 @@ func TestModelServingPartitionDeletedGroupHistoricalRevision(t *testing.T) {
13231310
require.NotEmpty(t, pods.Items)
13241311

13251312
podToDelete := pods.Items[0]
1326-
originalUID := podToDelete.UID
1313+
originalUID := string(podToDelete.UID)
13271314
t.Logf("Deleting pod %s (ordinal %d)", podToDelete.Name, targetOrdinal)
13281315

13291316
err = kubeClient.CoreV1().Pods(testNamespace).Delete(ctx, podToDelete.Name, metav1.DeleteOptions{})
@@ -1332,33 +1319,30 @@ func TestModelServingPartitionDeletedGroupHistoricalRevision(t *testing.T) {
13321319
utils.WaitForModelServingReady(t, ctx, kthenaClient, testNamespace, modelServing.Name)
13331320

13341321
require.Eventually(t, func() bool {
1335-
pods, err := kubeClient.CoreV1().Pods(testNamespace).List(ctx, metav1.ListOptions{
1336-
LabelSelector: labelSelector,
1337-
})
1338-
if err != nil || len(pods.Items) == 0 {
1322+
ordinalStates, err := collectRunningServingGroupStates(ctx, kubeClient, modelServing.Name)
1323+
if err != nil {
1324+
t.Logf("Failed to collect serving group states: %v", err)
13391325
return false
13401326
}
1341-
1342-
for _, pod := range pods.Items {
1343-
if pod.DeletionTimestamp != nil || pod.UID == originalUID {
1344-
continue
1345-
}
1346-
if pod.Status.Phase != corev1.PodRunning {
1347-
continue
1348-
}
1349-
for _, container := range pod.Spec.Containers {
1350-
if container.Image == nginxImage {
1351-
t.Logf("Recreated pod %s uses historical image", pod.Name)
1352-
return true
1353-
}
1354-
}
1327+
state, ok := ordinalStates[int32(targetOrdinal)]
1328+
if !ok {
1329+
return false
13551330
}
1356-
return false
1331+
t.Logf("Recreated protected ordinal %d => group=%s pod=%s revision=%s image=%s", targetOrdinal, state.GroupName, state.PodName, state.Revision, state.Image)
1332+
return state.PodUID != originalUID &&
1333+
state.Revision == initialRevision &&
1334+
state.Image == nginxImage
13571335
}, 3*time.Minute, 2*time.Second, "Recreated pod should use historical revision")
13581336

1359-
protectedCorrect, updatedCorrect := verifyPartitionState(t, ctx, kubeClient, modelServing.Name, partition, replicas)
1337+
finalMS, err := kthenaClient.WorkloadV1alpha1().ModelServings(testNamespace).Get(ctx, modelServing.Name, metav1.GetOptions{})
1338+
require.NoError(t, err)
1339+
ordinalStates, err := collectRunningServingGroupStates(ctx, kubeClient, modelServing.Name)
1340+
require.NoError(t, err)
1341+
protectedCorrect, updatedCorrect := verifyPartitionState(t, ordinalStates, partition, replicas, initialRevision, updateRevision)
13601342
assert.Equal(t, int(partition), protectedCorrect)
13611343
assert.Equal(t, int(replicas-partition), updatedCorrect)
1344+
assert.Equal(t, initialRevision, finalMS.Status.CurrentRevision)
1345+
assert.Equal(t, updateRevision, finalMS.Status.UpdateRevision)
13621346
}
13631347

13641348
// TestModelServingRollingUpdate verifies rolling updates without partition.
@@ -1367,7 +1351,7 @@ func TestModelServingRollingUpdate(t *testing.T) {
13671351

13681352
const replicas = int32(3)
13691353

1370-
modelServing := createBasicModelServing("test-rolling-update", replicas)
1354+
modelServing := createBasicModelServing("test-rolling-update", replicas, 0)
13711355
t.Logf("Creating ModelServing with %d replicas", replicas)
13721356
createAndWaitForModelServing(t, ctx, kthenaClient, modelServing)
13731357

@@ -1391,9 +1375,18 @@ func TestModelServingRollingUpdate(t *testing.T) {
13911375

13921376
finalMS, err := kthenaClient.WorkloadV1alpha1().ModelServings(testNamespace).Get(ctx, modelServing.Name, metav1.GetOptions{})
13931377
require.NoError(t, err)
1378+
require.NotEmpty(t, finalMS.Status.UpdateRevision, "UpdateRevision should be set after rollout")
13941379

13951380
assert.Equal(t, finalMS.Status.CurrentRevision, finalMS.Status.UpdateRevision)
13961381
assert.NotEqual(t, initialRevision, finalMS.Status.UpdateRevision)
1382+
1383+
ordinalStates, err := collectRunningServingGroupStates(ctx, kubeClient, modelServing.Name)
1384+
require.NoError(t, err)
1385+
require.Len(t, ordinalStates, int(replicas), "Expected one running group per replica after rollout")
1386+
for ordinal, state := range ordinalStates {
1387+
assert.Equalf(t, finalMS.Status.UpdateRevision, state.Revision, "Ordinal %d should use UpdateRevision without partition", ordinal)
1388+
assert.Equalf(t, nginxAlpineImage, state.Image, "Ordinal %d should run the updated image without partition", ordinal)
1389+
}
13971390
t.Logf("Rolling update completed - CurrentRevision: %s", finalMS.Status.CurrentRevision)
13981391
}
13991392

@@ -1442,35 +1435,114 @@ func createPartitionedModelServing(name string, replicas, partition int32) *work
14421435
}
14431436
}
14441437

1445-
func verifyPartitionState(t *testing.T, ctx context.Context, kubeClient *kubernetes.Clientset,
1446-
msName string, partition, replicas int32) (protectedCorrect, updatedCorrect int) {
1447-
t.Helper()
1448-
for ordinal := int32(0); ordinal < replicas; ordinal++ {
1449-
groupName := fmt.Sprintf("%s-%d", msName, ordinal)
1450-
labelSelector := fmt.Sprintf("modelserving.volcano.sh/group-name=%s", groupName)
1438+
type servingGroupState struct {
1439+
GroupName string
1440+
PodName string
1441+
PodUID string
1442+
Ordinal int32
1443+
Revision string
1444+
Image string
1445+
}
14511446

1452-
pods, err := kubeClient.CoreV1().Pods(testNamespace).List(ctx, metav1.ListOptions{
1453-
LabelSelector: labelSelector,
1454-
})
1455-
if err != nil || len(pods.Items) == 0 {
1447+
func collectRunningServingGroupStates(ctx context.Context, kubeClient *kubernetes.Clientset, msName string) (map[int32]servingGroupState, error) {
1448+
pods, err := kubeClient.CoreV1().Pods(testNamespace).List(ctx, metav1.ListOptions{
1449+
LabelSelector: modelServingLabelSelector(msName),
1450+
})
1451+
if err != nil {
1452+
return nil, err
1453+
}
1454+
1455+
states := make(map[int32]servingGroupState)
1456+
for _, pod := range pods.Items {
1457+
if pod.DeletionTimestamp != nil || pod.Status.Phase != corev1.PodRunning {
1458+
continue
1459+
}
1460+
groupName := pod.Labels[workload.GroupNameLabelKey]
1461+
if groupName == "" {
1462+
continue
1463+
}
1464+
ordinal, ok := parseServingGroupOrdinal(groupName, msName)
1465+
if !ok {
1466+
continue
1467+
}
1468+
revision := pod.Labels[workload.RevisionLabelKey]
1469+
if revision == "" || len(pod.Spec.Containers) == 0 {
14561470
continue
14571471
}
14581472

1459-
for _, pod := range pods.Items {
1460-
if pod.DeletionTimestamp != nil || pod.Status.Phase != corev1.PodRunning {
1461-
continue
1462-
}
1473+
state := servingGroupState{
1474+
GroupName: groupName,
1475+
PodName: pod.Name,
1476+
PodUID: string(pod.UID),
1477+
Ordinal: int32(ordinal),
1478+
Revision: revision,
1479+
Image: pod.Spec.Containers[0].Image,
1480+
}
1481+
if existing, ok := states[state.Ordinal]; !ok || state.PodName < existing.PodName {
1482+
states[state.Ordinal] = state
1483+
}
1484+
}
14631485

1464-
for _, container := range pod.Spec.Containers {
1465-
isProtected := ordinal < partition
1466-
if isProtected && container.Image == nginxImage {
1467-
protectedCorrect++
1468-
} else if !isProtected && container.Image == nginxAlpineImage {
1469-
updatedCorrect++
1470-
}
1471-
break // Only check first container
1472-
}
1473-
break // Only check first non-terminating running pod
1486+
return states, nil
1487+
}
1488+
1489+
func parseServingGroupOrdinal(groupName, msName string) (int, bool) {
1490+
prefix := msName + "-"
1491+
if !strings.HasPrefix(groupName, prefix) {
1492+
return 0, false
1493+
}
1494+
ordinal, err := strconv.Atoi(strings.TrimPrefix(groupName, prefix))
1495+
if err != nil {
1496+
return 0, false
1497+
}
1498+
return ordinal, true
1499+
}
1500+
1501+
func waitForPartitionState(t *testing.T, ctx context.Context, kthenaClient *clientset.Clientset,
1502+
kubeClient *kubernetes.Clientset, msName string, partition, replicas int32, initialRevision string) string {
1503+
t.Helper()
1504+
1505+
var updateRevision string
1506+
require.Eventually(t, func() bool {
1507+
ms, err := kthenaClient.WorkloadV1alpha1().ModelServings(testNamespace).Get(ctx, msName, metav1.GetOptions{})
1508+
if err != nil {
1509+
return false
1510+
}
1511+
ordinalStates, err := collectRunningServingGroupStates(ctx, kubeClient, msName)
1512+
if err != nil {
1513+
t.Logf("Failed to collect serving group states: %v", err)
1514+
return false
1515+
}
1516+
protectedCorrect, updatedCorrect := verifyPartitionState(t, ordinalStates, partition, replicas, initialRevision, ms.Status.UpdateRevision)
1517+
t.Logf("CurrentRevision: %s, UpdateRevision: %s, Protected: %d/%d, Updated: %d/%d",
1518+
ms.Status.CurrentRevision, ms.Status.UpdateRevision, protectedCorrect, partition, updatedCorrect, replicas-partition)
1519+
if ms.Status.CurrentRevision != initialRevision ||
1520+
ms.Status.UpdateRevision == "" ||
1521+
ms.Status.UpdateRevision == initialRevision ||
1522+
protectedCorrect != int(partition) ||
1523+
updatedCorrect != int(replicas-partition) {
1524+
return false
1525+
}
1526+
updateRevision = ms.Status.UpdateRevision
1527+
return true
1528+
}, 3*time.Minute, 2*time.Second, "Partition state did not converge")
1529+
1530+
return updateRevision
1531+
}
1532+
1533+
func verifyPartitionState(t *testing.T, ordinalStates map[int32]servingGroupState,
1534+
partition, replicas int32, currentRevision, updateRevision string) (protectedCorrect, updatedCorrect int) {
1535+
t.Helper()
1536+
for ordinal := int32(0); ordinal < replicas; ordinal++ {
1537+
state, ok := ordinalStates[ordinal]
1538+
if !ok {
1539+
continue
1540+
}
1541+
isProtected := ordinal < partition
1542+
if isProtected && state.Revision == currentRevision && state.Image == nginxImage {
1543+
protectedCorrect++
1544+
} else if !isProtected && state.Revision == updateRevision && state.Image == nginxAlpineImage {
1545+
updatedCorrect++
14741546
}
14751547
}
14761548
return

0 commit comments

Comments
 (0)