diff --git a/ray-operator/controllers/ray/raycluster_controller.go b/ray-operator/controllers/ray/raycluster_controller.go index 131ee275fbf..111f60d7dc5 100644 --- a/ray-operator/controllers/ray/raycluster_controller.go +++ b/ray-operator/controllers/ray/raycluster_controller.go @@ -707,8 +707,8 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv return errstd.New(reason) } } else if len(headPods.Items) == 0 { - originatedFrom := utils.GetCRDType(instance.Labels[utils.RayOriginatedFromCRDLabelKey]) - if originatedFrom == utils.RayJobCRD { + if meta.IsStatusConditionTrue(instance.Status.Conditions, string(rayv1.RayClusterProvisioned)) && + shouldSkipHeadPodRestart(instance) { // Recreating the head Pod if the RayCluster created by RayJob is provisioned doesn't help RayJob. // // Case 1: GCS fault tolerance is disabled @@ -720,13 +720,11 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv // // In this case, the worker Pods will not be killed by the new head Pod when it is created, but the submission ID has already been // used by the old Ray job, so the new Ray job will fail. - if meta.IsStatusConditionTrue(instance.Status.Conditions, string(rayv1.RayClusterProvisioned)) { - logger.Info( - "reconcilePods: Found 0 head Pods for a RayJob-managed RayCluster; skipping head creation to let RayJob controller handle the failure", - "rayCluster", instance.Name, - ) - return nil - } + logger.Info( + "reconcilePods: Found 0 head Pods for the RayCluster; Skipped head recreation due to ray.io/disable-provisioned-head-restart", + "rayCluster", instance.Name, + ) + return nil } // Create head Pod if it does not exist. logger.Info("reconcilePods: Found 0 head Pods; creating a head Pod for the RayCluster.") @@ -858,7 +856,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv } } } - logger.Info("reconcilePods", "found existing replica indices", "group", worker.GroupName, "indices", validReplicaIndices) + logger.Info("reconcilePods: found existing replica indices", "group", worker.GroupName, "indices", validReplicaIndices) } if diff > 0 { // pods need to be added @@ -1122,6 +1120,10 @@ func (r *RayClusterReconciler) reconcileMultiHostWorkerGroup(ctx context.Context return nil } +func shouldSkipHeadPodRestart(instance *rayv1.RayCluster) bool { + return instance.Annotations[utils.DisableProvisionedHeadRestartAnnotationKey] == "true" +} + // shouldRecreatePodsForUpgrade checks if any pods need to be recreated based on RayClusterSpec changes func (r *RayClusterReconciler) shouldRecreatePodsForUpgrade(ctx context.Context, instance *rayv1.RayCluster) bool { logger := ctrl.LoggerFrom(ctx) diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go index 70a5767e4c8..441af01edd9 100644 --- a/ray-operator/controllers/ray/rayjob_controller.go +++ b/ray-operator/controllers/ray/rayjob_controller.go @@ -905,6 +905,7 @@ func (r *RayJobReconciler) getOrCreateRayClusterInstance(ctx context.Context, ra if err != nil { return nil, err } + if r.options.BatchSchedulerManager != nil && rayJobInstance.Spec.SubmissionMode == rayv1.K8sJobMode { if scheduler, err := r.options.BatchSchedulerManager.GetScheduler(); err == nil { // Group name is only used for individual pods to specify their task group ("headgroup", "worker-group-1", etc.). @@ -939,10 +940,18 @@ func (r *RayJobReconciler) constructRayClusterForRayJob(rayJobInstance *rayv1.Ra maps.Copy(labels, rayJobInstance.Labels) labels[utils.RayOriginatedFromCRNameLabelKey] = rayJobInstance.Name labels[utils.RayOriginatedFromCRDLabelKey] = utils.RayOriginatedFromCRDLabelValue(utils.RayJobCRD) + labels[utils.RayJobSubmissionModeLabelKey] = string(rayJobInstance.Spec.SubmissionMode) + + annotations := make(map[string]string, len(rayJobInstance.Annotations)) + maps.Copy(annotations, rayJobInstance.Annotations) + if rayJobInstance.Spec.SubmissionMode == rayv1.SidecarMode { + annotations[utils.DisableProvisionedHeadRestartAnnotationKey] = "true" + } + rayCluster := &rayv1.RayCluster{ ObjectMeta: metav1.ObjectMeta{ Labels: labels, - Annotations: rayJobInstance.Annotations, + Annotations: annotations, Name: rayClusterName, Namespace: rayJobInstance.Namespace, }, diff --git a/ray-operator/controllers/ray/utils/constant.go b/ray-operator/controllers/ray/utils/constant.go index a4336a6db99..e41ce1b10ee 100644 --- a/ray-operator/controllers/ray/utils/constant.go +++ b/ray-operator/controllers/ray/utils/constant.go @@ -29,6 +29,9 @@ const ( KubeRayVersion = "ray.io/kuberay-version" RayCronJobNameLabelKey = "ray.io/cronjob-name" RayCronJobTimestampAnnotationKey = "ray.io/cronjob-scheduled-timestamp" + RayJobSubmissionModeLabelKey = "ray.io/job-submission-mode" + // DisableProvisionedHeadRestartAnnotationKey marks RayClusters created for sidecar-mode RayJobs to skip head Pod recreation after provisioning. + DisableProvisionedHeadRestartAnnotationKey = "ray.io/disable-provisioned-head-restart" // Labels for feature RayMultihostIndexing // diff --git a/ray-operator/test/e2erayjob/rayjob_sidecar_mode_test.go b/ray-operator/test/e2erayjob/rayjob_sidecar_mode_test.go index d3f3ed6058a..62f6a3c7d33 100644 --- a/ray-operator/test/e2erayjob/rayjob_sidecar_mode_test.go +++ b/ray-operator/test/e2erayjob/rayjob_sidecar_mode_test.go @@ -2,6 +2,7 @@ package e2erayjob import ( "testing" + "time" . "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" @@ -179,12 +180,24 @@ env_vars: g.Expect(err).NotTo(HaveOccurred()) rayCluster, err := GetRayCluster(test, rayJob.Namespace, rayJob.Status.RayClusterName) g.Expect(err).NotTo(HaveOccurred()) + g.Expect(rayCluster.Labels[utils.RayJobSubmissionModeLabelKey]).To(Equal(string(rayv1.SidecarMode))) + g.Expect(rayCluster.Annotations[utils.DisableProvisionedHeadRestartAnnotationKey]).To(Equal("true")) headPod, err := GetHeadPod(test, rayCluster) g.Expect(err).NotTo(HaveOccurred()) LogWithTimestamp(test.T(), "Deleting head Pod %s/%s for RayCluster %s", headPod.Namespace, headPod.Name, rayCluster.Name) err = test.Client().Core().CoreV1().Pods(headPod.Namespace).Delete(test.Ctx(), headPod.Name, metav1.DeleteOptions{}) g.Expect(err).NotTo(HaveOccurred()) + // Head pod should NOT be recreated for sidecar modes. + g.Eventually(func() error { + _, err := GetHeadPod(test, rayCluster) + return err + }, TestTimeoutMedium, 2*time.Second).Should(HaveOccurred()) + g.Consistently(func() error { + _, err := GetHeadPod(test, rayCluster) + return err + }, TestTimeoutShort, 2*time.Second).Should(HaveOccurred()) + // After head pod deletion, controller should mark RayJob as Failed with a specific message g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusFailed))) diff --git a/ray-operator/test/e2erayjob/rayjob_test.go b/ray-operator/test/e2erayjob/rayjob_test.go index 617510718ec..cb712a24fc7 100644 --- a/ray-operator/test/e2erayjob/rayjob_test.go +++ b/ray-operator/test/e2erayjob/rayjob_test.go @@ -279,7 +279,7 @@ env_vars: To(WithTransform(RayJobReason, Equal(rayv1.DeadlineExceeded))) }) - test.T().Run("RayJob fails when head Pod is deleted when job is running", func(_ *testing.T) { + test.T().Run("RayJob controller recreates the head Pod if it is deleted while the job is running", func(_ *testing.T) { rayJobAC := rayv1ac.RayJob("delete-head-after-submit", namespace.Name). WithSpec(rayv1ac.RayJobSpec(). WithRayClusterSpec(NewRayClusterSpec()). @@ -300,17 +300,23 @@ env_vars: g.Expect(err).NotTo(HaveOccurred()) rayCluster, err := GetRayCluster(test, rayJob.Namespace, rayJob.Status.RayClusterName) g.Expect(err).NotTo(HaveOccurred()) + g.Expect(rayCluster.Labels[utils.RayJobSubmissionModeLabelKey]).To(Equal(string(rayv1.K8sJobMode))) + g.Expect(rayCluster.Annotations[utils.DisableProvisionedHeadRestartAnnotationKey]).To(Equal("")) headPod, err := GetHeadPod(test, rayCluster) g.Expect(err).NotTo(HaveOccurred()) LogWithTimestamp(test.T(), "Deleting head Pod %s/%s for RayCluster %s", headPod.Namespace, headPod.Name, rayCluster.Name) err = test.Client().Core().CoreV1().Pods(headPod.Namespace).Delete(test.Ctx(), headPod.Name, metav1.DeleteOptions{}) g.Expect(err).NotTo(HaveOccurred()) - // After head pod deletion, controller should mark RayJob as Failed with a specific message + // Head pod should be recreated for non-sidecar modes. + g.Eventually(func() (*corev1.Pod, error) { + return GetHeadPod(test, rayCluster) + }, TestTimeoutMedium, 2*time.Second).ShouldNot(BeNil()) g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusFailed))) g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). Should(WithTransform(RayJobReason, Or( + Equal(rayv1.AppFailed), Equal(rayv1.JobDeploymentStatusTransitionGracePeriodExceeded), Equal(rayv1.SubmissionFailed), )))