Skip to content

Conversation

@chiayi
Copy link
Contributor

@chiayi chiayi commented Aug 28, 2025

Why are these changes needed?

Part of #3902. POC, Adds group indexing and host index to multi-host workers.

These labels are useful for running workloads with workers when numOfHosts > 1 and/or replicas > 1 for a worker group, where it's important that the workload runs on a specific worker or numOfHosts workers scaled as a part of the same replica. This is the case for TPU or GPU workloads with topology or worker index requirements.

Additionally, this PR adds logic to atomically delete a worker group replica where numOfHosts > 1. This logic is necessary because most multi-host workloads will hang and fail when a single worker in the group fails, and we should delete or restart these Pods together.

Related issue number

For: #3902

Checks

  • I've made sure the tests are passing.
  • Testing Strategy
    • Unit tests
    • Manual tests
    • This PR is not tested :(

@chiayi chiayi force-pushed the multihost-indexing branch 2 times, most recently from 8a74046 to a6b94b3 Compare August 28, 2025 17:32
@chiayi
Copy link
Contributor Author

chiayi commented Aug 28, 2025

@ryanaoleary PTAL when you get the chance.

@ryanaoleary
Copy link
Collaborator

ryanaoleary commented Aug 29, 2025

It'd be good to make clear the value of this PR.

Currently host and replica indexing for multi-host workers occurs in a separate GKE webhook that injects these values as env vars and a k8s label. The env vars and replicaIndex label are then read from within Ray to set Raylet labels and do things like atomically scale multi-host slices with autoscaling.

This PR moves the logic for indexing KubeRay worker Pods that request TPU from the webhook to KubeRay itself. By assigning indices as k8s Pod labels directly from KubeRay when they are created, we avoid the necessity for complicated logic in the TPU webhook that tracks the state of multi-host replicas in a RayCluster using a PodInformer. Since these variables are already used in Ray core and libraries like Train to handle the multi-host case, it makes sense to consolidate the logic in KubeRay. Additionally, since KubeRay is aware of when Pods are deleted, it becomes easier to scale-down multi-host replicas atomically. Overall, this PR is consolidating logic that is currently spread across the TPU webhook, KubeRay, and Ray core.

The next step after this PR would be to move the environment variable injection that occurs in the TPU webhook to Ray core when the Raylet is started on a node. The worker lifecycle would then look as follows for multi-host workers:

  1. Ray sends a request to scale N workers to satisfy resource requests for TPU
  2. KubeRay scales up a multi-host (NumOfHosts > 1) replica with N Pods (1 Pod per host) and indexes each worker using k8s labels
  3. When the Raylet is started on each worker Pod, the information in the k8s Pod spec (ray.io/host-index and ray.io/replica-index labels) is used to set required Jax environment variables like TPU_WORKER_ID, TPU_WORKER_HOSTNAMES, and TPU_NAME and the corresponding Raylet node labels for label based scheduling.
  4. When a multi-host worker is deleted by KubeRay, we can check the ray.io/replica-index label to scale down the entire slice atomically.

@chiayi chiayi force-pushed the multihost-indexing branch from a6b94b3 to 6935b9e Compare August 29, 2025 17:56
}

// Check if RayTpuMulithostIndexing feature is enabled
// Currently multihostIndexing won't work with Autoscaler v2 since autoscaler delete doesn't follow replica groups
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Currently multihostIndexing won't work with Autoscaler v2 since autoscaler delete doesn't follow replica groups

Can you explain more why this wouldn't work with the v2 autoscaler? Since it currently scales by replicas my initial thinking was that there shouldn't be an incompatibility.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, if that's the case then it is an misunderstanding on my side based on our prior discussion where my interpretation was that there was incompatibility due to how it scaled the replicas. Will remove the autoscaling v2 check.

Copy link
Collaborator

@ryanaoleary ryanaoleary Oct 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could just be forgetting what the incompatibility is, if I'm remembering correctly the v2 autoscaler determines the number of replicas of a group to scale, and then submits a scale request by patching both the replica count and workersToDelete of that group here.

There could be an issue with how the v2 autoscaler scales down here, since it doesn't consider whether to_delete_id is part of a multi-host group and will cause the entire group to scale down, but I think this might be fine though since we consider NumOfHosts in the desired num workers of a type here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add an e2e autoscaler test here https://github.com/ray-project/kuberay/tree/master/ray-operator/test/e2eautoscaler to verify the scale up/down behavior for both the V1 and V2 autoscaler in either this PR or a follow-up. That should probably be one of the requirements for moving this feature from alpha to beta.

@chiayi chiayi force-pushed the multihost-indexing branch 3 times, most recently from 45d53ae to bb602d5 Compare October 2, 2025 15:34
@ryanaoleary
Copy link
Collaborator

Once this is passing CI I think we can mark this as ready for review and ask other KubeRay contributors to review the feature.

@chiayi chiayi force-pushed the multihost-indexing branch 2 times, most recently from 5bebd86 to 1f21e83 Compare October 3, 2025 15:34
@chiayi chiayi marked this pull request as ready for review October 3, 2025 19:04
@chiayi chiayi force-pushed the multihost-indexing branch from 1f21e83 to 7419b56 Compare October 6, 2025 16:53
@chiayi chiayi changed the title [POC] Prototype multi-host indexing [TPU][RayCluster] Prototype multi-host indexing Oct 6, 2025
@andrewsykim andrewsykim changed the title [TPU][RayCluster] Prototype multi-host indexing [RayCluster] Prototype multi-host indexing Oct 6, 2025
return errstd.Join(utils.ErrFailedCreateWorkerPod, err)

// Worker creation path for multi-host indexing
if multihostIndexingEnabled {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic below seems pretty complicated, should we abstract it away in a util package?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we do since all it's really doing is going through and creating the remaining workers in groups. Abstracting it away in a util package will introduce another layer of indirection and I thought that might be unnecessary.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I don't think a util package is necessary, but I moved it to it's own reconciliation function in e9a8b23 because reconcilePods was really long and I wanted it to be really clear what's behind the feature gate.

@andrewsykim
Copy link
Member

I removed the "[TPU]" from the title, we should ensure this implementation is generic enough to also be used for GPUs. For e.g. labelling worker pods in an NVLink using GB200s

@rueian
Copy link
Collaborator

rueian commented Oct 17, 2025

Hi @ryanaoleary, could you help to merge this with the master branch again? There is a fix for tests.

@ryanaoleary
Copy link
Collaborator

Hi @ryanaoleary, could you help to merge this with the master branch again? There is a fix for tests.

@rueian Done, should be up to date now

Comment on lines 825 to 899
logger := ctrl.LoggerFrom(ctx)

// 1. Group existing pods by ray.io/worker-group-replica-index.
replicaMap := make(map[string][]corev1.Pod)
for _, pod := range workerPods {
if replicaName, ok := pod.Labels[utils.RayWorkerReplicaIndexKey]; ok {
replicaMap[replicaName] = append(replicaMap[replicaName], pod)
}
}

// 2. Clean up incomplete replica groups with deleted Pods caused by external deletion.
for replicaName, podList := range replicaMap {
if len(podList) < int(worker.NumOfHosts) {
logger.Info("Found incomplete multi-host replica group, deleting all remaining pods to maintain atomicity.", "group", worker.GroupName, "replica", replicaName, "found", len(podList), "expected", worker.NumOfHosts)
if err := r.deletePods(ctx, instance, podList, worker.GroupName, "cleanup of incomplete multi-host group"); err != nil {
return err
}
// Requeue to avoid creating new Pods on this reconciliation.
return fmt.Errorf("cleaned up incomplete replica group %s, requeueing", replicaName)
}
}

// 3. Delete unhealthy replica groups.
deletedPods := make(map[string]struct{})
for _, pod := range workerPods {
if _, alreadyDeleted := deletedPods[pod.Name]; alreadyDeleted {
continue
}
if shouldDelete, reason := shouldDeletePod(pod, rayv1.WorkerNode); shouldDelete {
replicaName := pod.Labels[utils.RayWorkerReplicaIndexKey]
podsToDelete, ok := replicaMap[replicaName]
if !ok {
continue
}
logger.Info("Deleting unhealthy replica group.", "group", worker.GroupName, "replica", replicaName, "reason", reason)
if err := r.deletePods(ctx, instance, podsToDelete, worker.GroupName, reason); err != nil {
return err
}
// All Pods in the group have been deleted.
for _, p := range podsToDelete {
deletedPods[p.Name] = struct{}{}
}
}
}
if len(deletedPods) > 0 {
return fmt.Errorf("deleted %d unhealthy worker Pods in multi-host groups, requeueing", len(deletedPods))
}

// 4. Handle explicit deletions from the autoscaler.
if len(worker.ScaleStrategy.WorkersToDelete) > 0 {
podsToDeleteFromStrategy := make(map[string]corev1.Pod)
for _, podName := range worker.ScaleStrategy.WorkersToDelete {
for _, pod := range workerPods {
if pod.Name == podName {
replicaName := pod.Labels[utils.RayWorkerReplicaIndexKey]
for _, p := range replicaMap[replicaName] {
podsToDeleteFromStrategy[p.Name] = p
}
break
}
}
}

if len(podsToDeleteFromStrategy) > 0 {
logger.Info("removing the pods in the scaleStrategy of", "group", worker.GroupName, "podsToDelete", len(podsToDeleteFromStrategy))
var podsToDel []corev1.Pod
for _, p := range podsToDeleteFromStrategy {
podsToDel = append(podsToDel, p)
}
if err := r.deletePods(ctx, instance, podsToDel, worker.GroupName, "autoscaler scale-down request"); err != nil {
return err
}
worker.ScaleStrategy.WorkersToDelete = []string{}
return fmt.Errorf("deleted %d worker Pods based on ScaleStrategy, requeueing", len(podsToDel))
}
// Clear WorkersToDelete after deletion.
worker.ScaleStrategy.WorkersToDelete = []string{}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger := ctrl.LoggerFrom(ctx)
// 1. Group existing pods by ray.io/worker-group-replica-index.
replicaMap := make(map[string][]corev1.Pod)
for _, pod := range workerPods {
if replicaName, ok := pod.Labels[utils.RayWorkerReplicaIndexKey]; ok {
replicaMap[replicaName] = append(replicaMap[replicaName], pod)
}
}
// 2. Clean up incomplete replica groups with deleted Pods caused by external deletion.
for replicaName, podList := range replicaMap {
if len(podList) < int(worker.NumOfHosts) {
logger.Info("Found incomplete multi-host replica group, deleting all remaining pods to maintain atomicity.", "group", worker.GroupName, "replica", replicaName, "found", len(podList), "expected", worker.NumOfHosts)
if err := r.deletePods(ctx, instance, podList, worker.GroupName, "cleanup of incomplete multi-host group"); err != nil {
return err
}
// Requeue to avoid creating new Pods on this reconciliation.
return fmt.Errorf("cleaned up incomplete replica group %s, requeueing", replicaName)
}
}
// 3. Delete unhealthy replica groups.
deletedPods := make(map[string]struct{})
for _, pod := range workerPods {
if _, alreadyDeleted := deletedPods[pod.Name]; alreadyDeleted {
continue
}
if shouldDelete, reason := shouldDeletePod(pod, rayv1.WorkerNode); shouldDelete {
replicaName := pod.Labels[utils.RayWorkerReplicaIndexKey]
podsToDelete, ok := replicaMap[replicaName]
if !ok {
continue
}
logger.Info("Deleting unhealthy replica group.", "group", worker.GroupName, "replica", replicaName, "reason", reason)
if err := r.deletePods(ctx, instance, podsToDelete, worker.GroupName, reason); err != nil {
return err
}
// All Pods in the group have been deleted.
for _, p := range podsToDelete {
deletedPods[p.Name] = struct{}{}
}
}
}
if len(deletedPods) > 0 {
return fmt.Errorf("deleted %d unhealthy worker Pods in multi-host groups, requeueing", len(deletedPods))
}
// 4. Handle explicit deletions from the autoscaler.
if len(worker.ScaleStrategy.WorkersToDelete) > 0 {
podsToDeleteFromStrategy := make(map[string]corev1.Pod)
for _, podName := range worker.ScaleStrategy.WorkersToDelete {
for _, pod := range workerPods {
if pod.Name == podName {
replicaName := pod.Labels[utils.RayWorkerReplicaIndexKey]
for _, p := range replicaMap[replicaName] {
podsToDeleteFromStrategy[p.Name] = p
}
break
}
}
}
if len(podsToDeleteFromStrategy) > 0 {
logger.Info("removing the pods in the scaleStrategy of", "group", worker.GroupName, "podsToDelete", len(podsToDeleteFromStrategy))
var podsToDel []corev1.Pod
for _, p := range podsToDeleteFromStrategy {
podsToDel = append(podsToDel, p)
}
if err := r.deletePods(ctx, instance, podsToDel, worker.GroupName, "autoscaler scale-down request"); err != nil {
return err
}
worker.ScaleStrategy.WorkersToDelete = []string{}
return fmt.Errorf("deleted %d worker Pods based on ScaleStrategy, requeueing", len(podsToDel))
}
// Clear WorkersToDelete after deletion.
worker.ScaleStrategy.WorkersToDelete = []string{}
}
logger := ctrl.LoggerFrom(ctx)
deletedPods := make(map[string]struct{})
// 1. Group existing pods by ray.io/worker-group-replica-index.
replicaMap := make(map[string][]corev1.Pod)
for _, pod := range workerPods {
if replicaName, ok := pod.Labels[utils.RayWorkerReplicaIndexKey]; ok {
replicaMap[replicaName] = append(replicaMap[replicaName], pod)
}
}
// 2. Clean up incomplete replica groups with deleted Pods caused by external deletion.
for replicaName, podList := range replicaMap {
if len(podList) < int(worker.NumOfHosts) {
logger.Info("Found incomplete multi-host replica group, deleting all remaining pods to maintain atomicity.", "group", worker.GroupName, "replica", replicaName, "found", len(podList), "expected", worker.NumOfHosts)
if err := r.deletePods(ctx, instance, podList, worker.GroupName, "cleanup of incomplete multi-host group"); err != nil {
return err
}
for _, p := range podList {
deletedPods[p.Name] = struct{}{}
}
}
}
// 3. Delete unhealthy replica groups.
for _, pod := range workerPods {
if _, alreadyDeleted := deletedPods[pod.Name]; alreadyDeleted {
continue
}
if shouldDelete, reason := shouldDeletePod(pod, rayv1.WorkerNode); shouldDelete {
replicaName := pod.Labels[utils.RayWorkerReplicaIndexKey]
podsToDelete, ok := replicaMap[replicaName]
if !ok {
continue
}
logger.Info("Deleting unhealthy replica group.", "group", worker.GroupName, "replica", replicaName, "reason", reason)
if err := r.deletePods(ctx, instance, podsToDelete, worker.GroupName, reason); err != nil {
return err
}
// All Pods in the group have been deleted.
for _, p := range podsToDelete {
deletedPods[p.Name] = struct{}{}
}
}
}
// 4. Handle explicit deletions from the autoscaler.
if len(worker.ScaleStrategy.WorkersToDelete) > 0 {
podsToDeleteFromStrategy := make(map[string]corev1.Pod)
for _, podName := range worker.ScaleStrategy.WorkersToDelete {
for _, pod := range workerPods {
if pod.Name == podName {
replicaName := pod.Labels[utils.RayWorkerReplicaIndexKey]
for _, p := range replicaMap[replicaName] {
podsToDeleteFromStrategy[p.Name] = p
}
break
}
}
}
if len(podsToDeleteFromStrategy) > 0 {
logger.Info("removing the pods in the scaleStrategy of", "group", worker.GroupName, "podsToDelete", len(podsToDeleteFromStrategy))
var podsToDel []corev1.Pod
for _, p := range podsToDeleteFromStrategy {
if _, ok := deletedPods[p.Name]; ok {
continue
}
podsToDel = append(podsToDel, p)
}
if err := r.deletePods(ctx, instance, podsToDel, worker.GroupName, "autoscaler scale-down request"); err != nil {
return err
}
for _, p := range podsToDel {
deletedPods[p.Name] = struct{}{}
}
}
// Clear WorkersToDelete after deletion.
worker.ScaleStrategy.WorkersToDelete = []string{}
}

Could we avoid requeue on deletion?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that should work fine too, I only added:

if len(deletedPods) > 0 {
    return fmt.Errorf("deleted %d unhealthy worker Pods in multi-host groups, requeueing", len(deletedPods))
}

because that's the same way it's handled in reconcilePods in the existing deletion code. I'll accept the suggestion though because I think it should work fine to process all deletions on one iteration.

Copy link
Collaborator

@rueian rueian Oct 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should work fine to process all deletions in one iteration. requeuing by returning an error cuasing stack trace being logged out is quite annoying. But I saw you latest change didn't do that in one iteration and CI failed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh sorry I thought I'd removed it but must not have committed the change, done in 29924f1.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the CI was failing because the feature was not enabled in the buildkite.

@ryanaoleary ryanaoleary requested a review from rueian October 20, 2025 11:55

isRayMultiHostIndexing := worker.NumOfHosts > 1 && features.Enabled(features.RayMultiHostIndexing)
if isRayMultiHostIndexing {
if err := r.reconcileMultiHostWorkerGroup(ctx, instance, &worker, workerPods.Items); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for my own understanding, was it easier to separate the single-host and multi-host reconcilation into it's own function as opposed to trying to have a single reconcile with conditionals for multi-host?

My concern with separate functions is that in the future it will be easy to forget to update reconcileMultiHostWorkerGroup, but it seems fine if the complexity of merging both is too high

Copy link
Member

@andrewsykim andrewsykim Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping it separate for alpha is probably fine because we don't want to accidentally introduce changes in the default single-host code path, but for Beta (on by default), we may want to merge the code paths

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially it was a single reconcile with conditionals for multi-host, and yeah I changed it to a separate reconcile function because reconcilePods had become very long with several conditionals and I wanted the two code paths to be very clear and avoid introducing unintended changes to the regular path.

I'm good to change it to follow the previous pattern, whatever makes it easiest to merge. I agree that this might be better when we turn this feature on by default.

@ryanaoleary ryanaoleary force-pushed the multihost-indexing branch 2 times, most recently from 1f2047e to c12c1d8 Compare October 20, 2025 22:34
Copy link
Member

@Future-Outlier Future-Outlier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @ryanaoleary
is there a simple script I can try to test this PR on my kind kuberentes clusrter?
I can have multiple nodes in my kind cluster.

@ryanaoleary
Copy link
Collaborator

ryanaoleary commented Oct 21, 2025

Hi, @ryanaoleary is there a simple script I can try to test this PR on my kind kuberentes clusrter? I can have multiple nodes in my kind cluster.

@Future-Outlier None of the changes in this PR actually rely on the Pods being scheduled/running, so you can test it with the existing multi-host TPU RayCluster's in the Ray repo: https://github.com/ray-project/kuberay/blob/master/ray-operator/config/samples/ray-cluster.tpu-v6e-16-multihost.yaml.

The steps to manually test would be:

  1. Install the KubeRay operator with these changes and features.RayMultiHostIndexing enabled
  2. Create the RayCluster with a multi-host TPUs worker group.
  3. kubectl describe the Pods to check that these two labels are added:
ray.io/worker-group-replica-index: tpu-group-<random-5-digit-suffix>
ray.io/replica-host-index: <unique-int-from-0-to-N-1-within-same-replica>
  1. Manually kubectl delete one of the worker Pods, you should be able to observe that NumOfHost Pods are deleted and then scaled back up to satisfy the desired # of replicas.

chiayi and others added 8 commits October 21, 2025 10:01
Signed-off-by: Ryan O'Leary <[email protected]>
Signed-off-by: Ryan O'Leary <[email protected]>
Signed-off-by: Ryan O'Leary <[email protected]>
Signed-off-by: Ryan O'Leary <[email protected]>
Signed-off-by: Ryan O'Leary <[email protected]>
Signed-off-by: Ryan O'Leary <[email protected]>
@ryanaoleary
Copy link
Collaborator

ryanaoleary commented Oct 21, 2025

4. Manually kubectl delete one of the worker Pods, you should be able to observe that NumOfHost Pods are deleted and then scaled back up to satisfy the desired #

Alternatively, any RayCluster with a worker group with NumOfHosts > 1 can be used to test this, here is a simple spec:

apiVersion: ray.io/v1
kind: RayCluster
metadata:
  name: raycluster-multihost
spec:
  headGroupSpec:
    rayStartParams: {}
    template:
      spec:
        containers:
          - name: ray-head
            image: rayproject/ray:2.50.0
            imagePullPolicy: IfNotPresent
            resources:
              limits:
                cpu: "8"
                ephemeral-storage: 20Gi
                memory: 40G
              requests:
                cpu: "8"
                ephemeral-storage: 10Gi
                memory: 40G
            ports:
              - containerPort: 6379
                name: gcs
              - containerPort: 8265
                name: dashboard
              - containerPort: 10001
                name: client
              - containerPort: 8000
                name: serve
  workerGroupSpecs:
  - rayStartParams: {}
    replicas: 1
    minReplicas: 0
    maxReplicas: 2
    numOfHosts: 4
    groupName: multi-host-group
    template:
      spec:
        containers:
          - name: ray-worker
            image: rayproject/ray:2.50.0
            imagePullPolicy: IfNotPresent
            resources:
              limits:
                cpu: "1"
                memory: 2G
              requests:
                cpu: "1"
                memory: 2G

we'd expect to see a ray.io/replica-host-index label with values from 0 to 3 on each of the Pods in one multi-host replica. Additionally, for each multi-host replica we'd expect to see an identical ray.io/worker-group-replica-index: multi-host-group-<random-5-digit-suffix> label on each of the Pods. If we increase replicas to 2, then the 4 new Pods created should have a unique ray.io/worker-group-replica-index: multi-host-group-<random-5-digit-suffix> label different than the original 4 Pods.

@rueian rueian merged commit 944b60c into ray-project:master Oct 22, 2025
27 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants