Skip to content
Open
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
cb6172c
[Bug][RayJob] Sidecar mode shouldn't restart head pod when head pod i…
400Ping Nov 27, 2025
415ee29
[fix] fix CI error
400Ping Nov 28, 2025
2bbf8cb
update
400Ping Dec 15, 2025
2bf33f3
reunite if statement
400Ping Dec 17, 2025
a97a3b5
update
400Ping Dec 17, 2025
c4bfd24
fix ci error
400Ping Dec 17, 2025
e7499ad
fix
400Ping Dec 18, 2025
714d760
put back unnecessary comment deletion
400Ping Dec 18, 2025
60aba9c
Better rayjob logic
Future-Outlier Dec 22, 2025
8a7c66f
update
Future-Outlier Dec 22, 2025
45bb98a
update
Future-Outlier Dec 22, 2025
59ef8b3
update
Future-Outlier Dec 22, 2025
2464704
update
Future-Outlier Dec 22, 2025
03ff4fe
Update ray-operator/test/e2erayjob/rayjob_test.go
400Ping Dec 31, 2025
63957d1
Update ray-operator/test/e2erayjob/rayjob_test.go
400Ping Dec 31, 2025
3115ae4
update rayjob test
400Ping Jan 1, 2026
6f2dfa3
Merge branch 'master' into bug/sidecar-mode-fix
400Ping Jan 1, 2026
76828e7
Merge branch 'master' into bug/sidecar-mode-fix
400Ping Jan 1, 2026
a5b30a4
fix merge conflict error
400Ping Jan 1, 2026
e77db80
Update ray-operator/test/e2erayjob/rayjob_sidecar_mode_test.go
400Ping Jan 1, 2026
33afa20
update
400Ping Jan 2, 2026
ec01312
revert reason assertion
400Ping Jan 2, 2026
6e7c738
[chore] retrigger ci
400Ping Jan 3, 2026
e155b5e
update
400Ping Jan 3, 2026
1f9dbe8
[chore] change from HeadPod to GetHeadPod
400Ping Jan 3, 2026
883eb7c
add submission mode label key label
Future-Outlier Jan 4, 2026
d00b7c6
Merge remote-tracking branch 'upstream/master' into bug/sidecar-mode-fix
Future-Outlier Jan 4, 2026
7246d33
Update ray-operator/controllers/ray/utils/constant.go
400Ping Jan 6, 2026
9a7eaa4
Update ray-operator/controllers/ray/raycluster_controller.go
400Ping Jan 6, 2026
d02c6a7
Update ray-operator/controllers/ray/raycluster_controller.go
400Ping Jan 6, 2026
f3d9431
Update ray-operator/controllers/ray/rayjob_controller.go
400Ping Jan 6, 2026
a59e486
Update ray-operator/controllers/ray/rayjob_controller.go
400Ping Jan 7, 2026
729cf0c
Update ray-operator/controllers/ray/utils/constant.go
400Ping Jan 7, 2026
8d07ece
Update ray-operator/controllers/ray/rayjob_controller.go
400Ping Jan 7, 2026
04caf77
update
400Ping Jan 7, 2026
c0d916d
Add missing label
400Ping Jan 7, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,8 +707,8 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
return errstd.New(reason)
}
} else if len(headPods.Items) == 0 {
originatedFrom := utils.GetCRDType(instance.Labels[utils.RayOriginatedFromCRDLabelKey])
if originatedFrom == utils.RayJobCRD {
if meta.IsStatusConditionTrue(instance.Status.Conditions, string(rayv1.RayClusterProvisioned)) &&
shouldSkipHeadPodRestart(instance) {
// Recreating the head Pod if the RayCluster created by RayJob is provisioned doesn't help RayJob.
//
// Case 1: GCS fault tolerance is disabled
Expand All @@ -720,13 +720,11 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
//
// In this case, the worker Pods will not be killed by the new head Pod when it is created, but the submission ID has already been
// used by the old Ray job, so the new Ray job will fail.
if meta.IsStatusConditionTrue(instance.Status.Conditions, string(rayv1.RayClusterProvisioned)) {
logger.Info(
"reconcilePods: Found 0 head Pods for a RayJob-managed RayCluster; skipping head creation to let RayJob controller handle the failure",
"rayCluster", instance.Name,
)
return nil
}
logger.Info(
"reconcilePods: Found 0 head Pods for the RayCluster; Skipped head recreation due to ray.io/disable-provisioned-head-restart",
"rayCluster", instance.Name,
)
return nil
}
// Create head Pod if it does not exist.
logger.Info("reconcilePods: Found 0 head Pods; creating a head Pod for the RayCluster.")
Expand Down Expand Up @@ -858,7 +856,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
}
}
}
logger.Info("reconcilePods", "found existing replica indices", "group", worker.GroupName, "indices", validReplicaIndices)
logger.Info("reconcilePods: found existing replica indices", "group", worker.GroupName, "indices", validReplicaIndices)
}
if diff > 0 {
// pods need to be added
Expand Down Expand Up @@ -1122,6 +1120,10 @@ func (r *RayClusterReconciler) reconcileMultiHostWorkerGroup(ctx context.Context
return nil
}

func shouldSkipHeadPodRestart(instance *rayv1.RayCluster) bool {
return instance.Annotations[utils.DisableProvisionedHeadRestartAnnotationKey] == "true"
}

// shouldRecreatePodsForUpgrade checks if any pods need to be recreated based on RayClusterSpec changes
func (r *RayClusterReconciler) shouldRecreatePodsForUpgrade(ctx context.Context, instance *rayv1.RayCluster) bool {
logger := ctrl.LoggerFrom(ctx)
Expand Down
7 changes: 5 additions & 2 deletions ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,7 @@ func (r *RayJobReconciler) getOrCreateRayClusterInstance(ctx context.Context, ra
if err != nil {
return nil, err
}

if r.options.BatchSchedulerManager != nil && rayJobInstance.Spec.SubmissionMode == rayv1.K8sJobMode {
if scheduler, err := r.options.BatchSchedulerManager.GetScheduler(); err == nil {
// Group name is only used for individual pods to specify their task group ("headgroup", "worker-group-1", etc.).
Expand Down Expand Up @@ -937,8 +938,10 @@ func (r *RayJobReconciler) getOrCreateRayClusterInstance(ctx context.Context, ra
func (r *RayJobReconciler) constructRayClusterForRayJob(rayJobInstance *rayv1.RayJob, rayClusterName string) (*rayv1.RayCluster, error) {
labels := make(map[string]string, len(rayJobInstance.Labels))
maps.Copy(labels, rayJobInstance.Labels)
labels[utils.RayOriginatedFromCRNameLabelKey] = rayJobInstance.Name
labels[utils.RayOriginatedFromCRDLabelKey] = utils.RayOriginatedFromCRDLabelValue(utils.RayJobCRD)
if rayJobInstance.Spec.SubmissionMode == rayv1.SidecarMode {
annotations[utils.DisableProvisionedHeadRestartAnnotationKey] = "true"
}

rayCluster := &rayv1.RayCluster{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
Expand Down
3 changes: 3 additions & 0 deletions ray-operator/controllers/ray/utils/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ const (
KubeRayVersion = "ray.io/kuberay-version"
RayCronJobNameLabelKey = "ray.io/cronjob-name"
RayCronJobTimestampAnnotationKey = "ray.io/cronjob-scheduled-timestamp"
RayJobSubmissionModeLabelKey = "ray.io/job-submission-mode"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious, why we need this label? Seems like we are not using it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because we want to let user know only sidecar mode will not provision head pod twice

// DisableProvisionedHeadRestartAnnotationKey marks RayClusters created for sidecar-mode RayJobs to skip head Pod recreation after provisioning.
DisableProvisionedHeadRestartAnnotationKey = "ray.io/disable-provisioned-head-restart"

// Labels for feature RayMultihostIndexing
//
Expand Down
13 changes: 13 additions & 0 deletions ray-operator/test/e2erayjob/rayjob_sidecar_mode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package e2erayjob

import (
"testing"
"time"

. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -179,12 +180,24 @@ env_vars:
g.Expect(err).NotTo(HaveOccurred())
rayCluster, err := GetRayCluster(test, rayJob.Namespace, rayJob.Status.RayClusterName)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(rayCluster.Labels[utils.RayJobSubmissionModeLabelKey]).To(Equal(string(rayv1.SidecarMode)))
g.Expect(rayCluster.Labels[utils.RayJobDisableProvisionedHeadNodeRestartLabelKey]).To(Equal("true"))
headPod, err := GetHeadPod(test, rayCluster)
g.Expect(err).NotTo(HaveOccurred())
LogWithTimestamp(test.T(), "Deleting head Pod %s/%s for RayCluster %s", headPod.Namespace, headPod.Name, rayCluster.Name)
err = test.Client().Core().CoreV1().Pods(headPod.Namespace).Delete(test.Ctx(), headPod.Name, metav1.DeleteOptions{})
g.Expect(err).NotTo(HaveOccurred())

// Head pod should NOT be recreated for sidecar modes.
// 1. use GetHeadPod function, I want to delete GetHeadPodOrNil function, and check the

g.Eventually(func() (*corev1.Pod, error) {
return GetHeadPodOrNil(test, rayCluster)
}, TestTimeoutMedium, 2*time.Second).Should(BeNil())
g.Consistently(func() (*corev1.Pod, error) {
return GetHeadPodOrNil(test, rayCluster)
}, TestTimeoutShort, 2*time.Second).Should(BeNil())

// After head pod deletion, controller should mark RayJob as Failed with a specific message
g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium).
Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusFailed)))
Expand Down
10 changes: 8 additions & 2 deletions ray-operator/test/e2erayjob/rayjob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ env_vars:
To(WithTransform(RayJobReason, Equal(rayv1.DeadlineExceeded)))
})

test.T().Run("RayJob fails when head Pod is deleted when job is running", func(_ *testing.T) {
test.T().Run("RayJob controller recreates the head Pod if it is deleted while the job is running", func(_ *testing.T) {
rayJobAC := rayv1ac.RayJob("delete-head-after-submit", namespace.Name).
WithSpec(rayv1ac.RayJobSpec().
WithRayClusterSpec(NewRayClusterSpec()).
Expand All @@ -300,17 +300,23 @@ env_vars:
g.Expect(err).NotTo(HaveOccurred())
rayCluster, err := GetRayCluster(test, rayJob.Namespace, rayJob.Status.RayClusterName)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(rayCluster.Labels).To(HaveKeyWithValue(utils.RayJobSubmissionModeLabelKey, string(rayv1.K8sJobMode)))
g.Expect(rayCluster.Labels).To(HaveKeyWithValue(utils.RayJobDisableProvisionedHeadNodeRestartLabelKey, "false"))
headPod, err := GetHeadPod(test, rayCluster)
g.Expect(err).NotTo(HaveOccurred())
LogWithTimestamp(test.T(), "Deleting head Pod %s/%s for RayCluster %s", headPod.Namespace, headPod.Name, rayCluster.Name)
err = test.Client().Core().CoreV1().Pods(headPod.Namespace).Delete(test.Ctx(), headPod.Name, metav1.DeleteOptions{})
g.Expect(err).NotTo(HaveOccurred())

// After head pod deletion, controller should mark RayJob as Failed with a specific message
// Head pod should be recreated for non-sidecar modes.
g.Eventually(func() (*corev1.Pod, error) {
return GetHeadPod(test, rayCluster)
}, TestTimeoutMedium, 2*time.Second).ShouldNot(BeNil())
g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium).
Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusFailed)))
g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium).
Should(WithTransform(RayJobReason, Or(
Equal(rayv1.AppFailed),
Equal(rayv1.JobDeploymentStatusTransitionGracePeriodExceeded),
Equal(rayv1.SubmissionFailed),
)))
Expand Down
23 changes: 23 additions & 0 deletions ray-operator/test/support/ray.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ func HeadPod(t Test, rayCluster *rayv1.RayCluster) func() (*corev1.Pod, error) {
}
}

func HeadPodOrNil(t Test, rayCluster *rayv1.RayCluster) func() (*corev1.Pod, error) {
return func() (*corev1.Pod, error) {
return GetHeadPodOrNil(t, rayCluster)
}
}

func GetHeadPod(t Test, rayCluster *rayv1.RayCluster) (*corev1.Pod, error) {
pods, err := t.Client().Core().CoreV1().Pods(rayCluster.Namespace).List(
t.Ctx(),
Expand All @@ -149,6 +155,23 @@ func GetHeadPod(t Test, rayCluster *rayv1.RayCluster) (*corev1.Pod, error) {
return &pods.Items[0], nil
}

func GetHeadPodOrNil(t Test, rayCluster *rayv1.RayCluster) (*corev1.Pod, error) {
pods, err := t.Client().Core().CoreV1().Pods(rayCluster.Namespace).List(
t.Ctx(),
common.RayClusterHeadPodsAssociationOptions(rayCluster).ToMetaV1ListOptions(),
)
if err != nil {
return nil, err
}
if len(pods.Items) == 0 {
return nil, nil
}
if len(pods.Items) != 1 {
return nil, errors.New("number of head pods is not 1")
}
return &pods.Items[0], nil
}

func WorkerPods(t Test, rayCluster *rayv1.RayCluster) func() ([]corev1.Pod, error) {
return func() ([]corev1.Pod, error) {
return GetWorkerPods(t, rayCluster)
Expand Down
Loading