Skip to content

Commit 93969f7

Browse files
machichimaryanaoleary
authored andcommitted
feat: add grace period after sumitter finished (#4091)
* feat: add grace period after sumitter finished * feat: generate new field in rayjob CRD * fix: fix terminate logic + add default timeout value * refactor: remove submitter finish timeout in rayjob status & use default timeout * feat: get finish time from job and container * test: for submitter finished timeout * fix: set finishedAt to nil if submitter not finished * test: ensure timeout close to the set timeout * refactor: make status msg more readable * fix: simplify finishedAt time format * refactor: remove isSubmitterFinished and use finishedAt only * fix: add LastTransitionTime to JobCondition in test * Trigger CI Signed-off-by: machichima <[email protected]> * refactor: fix lint and nit * fix: fix test --------- Signed-off-by: machichima <[email protected]>
1 parent e9a8b23 commit 93969f7

File tree

3 files changed

+149
-26
lines changed

3 files changed

+149
-26
lines changed

ray-operator/controllers/ray/rayjob_controller.go

Lines changed: 63 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@ import (
3434
)
3535

3636
const (
37-
RayJobDefaultRequeueDuration = 3 * time.Second
38-
PythonUnbufferedEnvVarName = "PYTHONUNBUFFERED"
37+
RayJobDefaultRequeueDuration = 3 * time.Second
38+
PythonUnbufferedEnvVarName = "PYTHONUNBUFFERED"
39+
DefaultSubmitterFinishedTimeout = 30 * time.Second
3940
)
4041

4142
// RayJobReconciler reconciles a RayJob object
@@ -258,13 +259,18 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
258259
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
259260
}
260261

261-
var isSubmitterFinished bool
262+
var finishedAt *time.Time
262263
if rayJobInstance.Spec.SubmissionMode == rayv1.K8sJobMode || rayJobInstance.Spec.SubmissionMode == rayv1.SidecarMode {
263264
var shouldUpdate bool
264-
shouldUpdate, isSubmitterFinished, err = r.checkSubmitterAndUpdateStatusIfNeeded(ctx, rayJobInstance)
265+
shouldUpdate, finishedAt, err = r.checkSubmitterAndUpdateStatusIfNeeded(ctx, rayJobInstance)
265266
if err != nil {
266267
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
267268
}
269+
270+
if checkSubmitterFinishedTimeoutAndUpdateStatusIfNeeded(ctx, rayJobInstance, finishedAt) {
271+
break
272+
}
273+
268274
if shouldUpdate {
269275
break
270276
}
@@ -288,7 +294,8 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
288294
}
289295
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
290296
}
291-
if isSubmitterFinished {
297+
// finishedAt will only be set if submitter finished
298+
if finishedAt != nil {
292299
rayJobInstance.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusFailed
293300
rayJobInstance.Status.Reason = rayv1.AppFailed
294301
rayJobInstance.Status.Message = "Submitter completed but Ray job not found in RayCluster."
@@ -306,10 +313,10 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
306313
jobDeploymentStatus := rayv1.JobDeploymentStatusRunning
307314
reason := rayv1.JobFailedReason("")
308315
isJobTerminal := rayv1.IsJobTerminal(jobInfo.JobStatus)
309-
// If in K8sJobMode, further refine the terminal condition by checking if the submitter Job has finished.
316+
// If in K8sJobMode or SidecarMode, further refine the terminal condition by checking if the submitter has finished.
310317
// See https://github.com/ray-project/kuberay/pull/1919 for reasons.
311318
if utils.HasSubmitter(rayJobInstance) {
312-
isJobTerminal = isJobTerminal && isSubmitterFinished
319+
isJobTerminal = isJobTerminal && finishedAt != nil
313320
}
314321

315322
if isJobTerminal {
@@ -973,10 +980,10 @@ func updateStatusToSuspendingIfNeeded(ctx context.Context, rayJob *rayv1.RayJob)
973980
return true
974981
}
975982

976-
func (r *RayJobReconciler) checkSubmitterAndUpdateStatusIfNeeded(ctx context.Context, rayJob *rayv1.RayJob) (shouldUpdate, isSubmitterFinished bool, err error) {
983+
func (r *RayJobReconciler) checkSubmitterAndUpdateStatusIfNeeded(ctx context.Context, rayJob *rayv1.RayJob) (shouldUpdate bool, finishedAt *time.Time, err error) {
977984
logger := ctrl.LoggerFrom(ctx)
978985
shouldUpdate = false
979-
isSubmitterFinished = false
986+
finishedAt = nil
980987
var submitterContainerStatus *corev1.ContainerStatus
981988
var condition *batchv1.JobCondition
982989

@@ -1023,7 +1030,7 @@ func (r *RayJobReconciler) checkSubmitterAndUpdateStatusIfNeeded(ctx context.Con
10231030
}
10241031
}
10251032

1026-
isSubmitterFinished = isSubmitterContainerFinished(headPod)
1033+
finishedAt = getSubmitterContainerFinishedTime(headPod)
10271034
return
10281035
case rayv1.K8sJobMode:
10291036
job := &batchv1.Job{}
@@ -1037,6 +1044,7 @@ func (r *RayJobReconciler) checkSubmitterAndUpdateStatusIfNeeded(ctx context.Con
10371044
logger.Error(err, "Failed to get the submitter Kubernetes Job for RayJob", "NamespacedName", namespacedName)
10381045
return
10391046
}
1047+
10401048
shouldUpdate, condition = checkK8sJobStatus(job)
10411049
if shouldUpdate {
10421050
logger.Info("The submitter Kubernetes Job has failed. Attempting to transition the status to `Failed`.",
@@ -1052,7 +1060,14 @@ func (r *RayJobReconciler) checkSubmitterAndUpdateStatusIfNeeded(ctx context.Con
10521060
rayJob.Status.Message = fmt.Sprintf("Job submission has failed. Reason: %s. Message: %s", condition.Reason, condition.Message)
10531061
}
10541062
}
1055-
_, isSubmitterFinished = utils.IsJobFinished(job)
1063+
1064+
var jobFinishedCondition *batchv1.JobCondition
1065+
// Find the terminal condition to get its LastTransitionTime
1066+
jobFinishedCondition = getJobFinishedCondition(job)
1067+
if jobFinishedCondition != nil && !jobFinishedCondition.LastTransitionTime.IsZero() {
1068+
finishedAt = &jobFinishedCondition.LastTransitionTime.Time
1069+
}
1070+
10561071
return
10571072
default:
10581073
// This means that the KubeRay logic is wrong, and it's better to panic as a system error than to allow the operator to
@@ -1071,6 +1086,15 @@ func checkK8sJobStatus(job *batchv1.Job) (bool, *batchv1.JobCondition) {
10711086
return false, nil
10721087
}
10731088

1089+
func getJobFinishedCondition(job *batchv1.Job) *batchv1.JobCondition {
1090+
for _, condition := range job.Status.Conditions {
1091+
if (condition.Type == batchv1.JobComplete || condition.Type == batchv1.JobFailed) && condition.Status == corev1.ConditionTrue {
1092+
return &condition
1093+
}
1094+
}
1095+
return nil
1096+
}
1097+
10741098
func checkSidecarContainerStatus(headPod *corev1.Pod) (bool, *corev1.ContainerStatus) {
10751099
for _, containerStatus := range headPod.Status.ContainerStatuses {
10761100
if containerStatus.Name == utils.SubmitterContainerName {
@@ -1099,6 +1123,31 @@ func checkActiveDeadlineAndUpdateStatusIfNeeded(ctx context.Context, rayJob *ray
10991123
return true
11001124
}
11011125

1126+
func checkSubmitterFinishedTimeoutAndUpdateStatusIfNeeded(ctx context.Context, rayJob *rayv1.RayJob, finishedAt *time.Time) bool {
1127+
logger := ctrl.LoggerFrom(ctx)
1128+
1129+
// Check if timeout is configured and submitter has finished
1130+
if finishedAt == nil {
1131+
return false
1132+
}
1133+
1134+
// Check if timeout has been exceeded
1135+
if time.Now().Before(finishedAt.Add(DefaultSubmitterFinishedTimeout)) {
1136+
return false
1137+
}
1138+
1139+
logger.Info("The RayJob has passed the submitterFinishedTimeoutSeconds. Transition the status to terminal.",
1140+
"SubmitterFinishedTime", finishedAt,
1141+
"SubmitterFinishedTimeoutSeconds", DefaultSubmitterFinishedTimeout.String())
1142+
1143+
rayJob.Status.JobStatus = rayv1.JobStatusFailed
1144+
rayJob.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusFailed
1145+
rayJob.Status.Reason = rayv1.JobDeploymentStatusTransitionGracePeriodExceeded
1146+
rayJob.Status.Message = fmt.Sprintf("The RayJob submitter finished at %v but the ray job did not reach terminal state within %v",
1147+
finishedAt.Format(time.DateTime), DefaultSubmitterFinishedTimeout)
1148+
return true
1149+
}
1150+
11021151
func checkTransitionGracePeriodAndUpdateStatusIfNeeded(ctx context.Context, rayJob *rayv1.RayJob) bool {
11031152
logger := ctrl.LoggerFrom(ctx)
11041153
if rayv1.IsJobTerminal(rayJob.Status.JobStatus) && rayJob.Status.JobDeploymentStatus == rayv1.JobDeploymentStatusRunning {
@@ -1129,16 +1178,16 @@ func checkTransitionGracePeriodAndUpdateStatusIfNeeded(ctx context.Context, rayJ
11291178
return false
11301179
}
11311180

1132-
func isSubmitterContainerFinished(pod *corev1.Pod) bool {
1181+
func getSubmitterContainerFinishedTime(pod *corev1.Pod) *time.Time {
11331182
for _, containerStatus := range pod.Status.ContainerStatuses {
11341183
if containerStatus.Name == utils.SubmitterContainerName {
11351184
if containerStatus.State.Terminated != nil {
1136-
return true
1185+
return &containerStatus.State.Terminated.FinishedAt.Time
11371186
}
11381187
break
11391188
}
11401189
}
1141-
return false
1190+
return nil
11421191
}
11431192

11441193
// handleDeletionRules processes the DeletionRules with a impact-aware strategy.

ray-operator/controllers/ray/rayjob_controller_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ var _ = Context("RayJob with different submission modes", func() {
290290

291291
// Update the submitter Kubernetes Job to Complete.
292292
conditions := []batchv1.JobCondition{
293-
{Type: batchv1.JobComplete, Status: corev1.ConditionTrue},
293+
{Type: batchv1.JobComplete, Status: corev1.ConditionTrue, LastTransitionTime: metav1.Now()},
294294
}
295295
job.Status.Conditions = conditions
296296
Expect(k8sClient.Status().Update(ctx, job)).Should(Succeed())
@@ -515,7 +515,7 @@ var _ = Context("RayJob with different submission modes", func() {
515515

516516
// Update the submitter Kubernetes Job to Complete.
517517
conditions := []batchv1.JobCondition{
518-
{Type: batchv1.JobComplete, Status: corev1.ConditionTrue},
518+
{Type: batchv1.JobComplete, Status: corev1.ConditionTrue, LastTransitionTime: metav1.Now()},
519519
}
520520
job.Status.Conditions = conditions
521521
Expect(k8sClient.Status().Update(ctx, job)).Should(Succeed())
@@ -680,7 +680,7 @@ var _ = Context("RayJob with different submission modes", func() {
680680

681681
// Update the submitter Kubernetes Job to Complete.
682682
conditions := []batchv1.JobCondition{
683-
{Type: batchv1.JobComplete, Status: corev1.ConditionTrue},
683+
{Type: batchv1.JobComplete, Status: corev1.ConditionTrue, LastTransitionTime: metav1.Now()},
684684
}
685685
job.Status.Conditions = conditions
686686
Expect(k8sClient.Status().Update(ctx, job)).Should(Succeed())
@@ -775,7 +775,7 @@ var _ = Context("RayJob with different submission modes", func() {
775775

776776
// Update the submitter Kubernetes Job to Complete.
777777
conditions := []batchv1.JobCondition{
778-
{Type: batchv1.JobComplete, Status: corev1.ConditionTrue},
778+
{Type: batchv1.JobComplete, Status: corev1.ConditionTrue, LastTransitionTime: metav1.Now()},
779779
}
780780
job.Status.Conditions = conditions
781781
Expect(k8sClient.Status().Update(ctx, job)).Should(Succeed())
@@ -1002,7 +1002,7 @@ var _ = Context("RayJob with different submission modes", func() {
10021002

10031003
// Update the submitter Kubernetes Job to Complete.
10041004
conditions := []batchv1.JobCondition{
1005-
{Type: batchv1.JobComplete, Status: corev1.ConditionTrue},
1005+
{Type: batchv1.JobComplete, Status: corev1.ConditionTrue, LastTransitionTime: metav1.Now()},
10061006
}
10071007
job.Status.Conditions = conditions
10081008
Expect(k8sClient.Status().Update(ctx, job)).Should(Succeed())
@@ -1141,7 +1141,7 @@ var _ = Context("RayJob with different submission modes", func() {
11411141

11421142
// Update the submitter Kubernetes Job to Complete.
11431143
conditions := []batchv1.JobCondition{
1144-
{Type: batchv1.JobComplete, Status: corev1.ConditionTrue},
1144+
{Type: batchv1.JobComplete, Status: corev1.ConditionTrue, LastTransitionTime: metav1.Now()},
11451145
}
11461146
job.Status.Conditions = conditions
11471147
Expect(k8sClient.Status().Update(ctx, job)).Should(Succeed())
@@ -1280,7 +1280,7 @@ var _ = Context("RayJob with different submission modes", func() {
12801280

12811281
// Update the submitter Kubernetes Job to Complete.
12821282
conditions := []batchv1.JobCondition{
1283-
{Type: batchv1.JobComplete, Status: corev1.ConditionTrue},
1283+
{Type: batchv1.JobComplete, Status: corev1.ConditionTrue, LastTransitionTime: metav1.Now()},
12841284
}
12851285
job.Status.Conditions = conditions
12861286
Expect(k8sClient.Status().Update(ctx, job)).Should(Succeed())
@@ -1436,7 +1436,7 @@ var _ = Context("RayJob with different submission modes", func() {
14361436

14371437
// Update the submitter Kubernetes Job to Complete.
14381438
conditions := []batchv1.JobCondition{
1439-
{Type: batchv1.JobComplete, Status: corev1.ConditionTrue},
1439+
{Type: batchv1.JobComplete, Status: corev1.ConditionTrue, LastTransitionTime: metav1.Now()},
14401440
}
14411441
job.Status.Conditions = conditions
14421442
Expect(k8sClient.Status().Update(ctx, job)).Should(Succeed())
@@ -1580,7 +1580,7 @@ var _ = Context("RayJob with different submission modes", func() {
15801580

15811581
// Update the submitter Kubernetes Job to Complete.
15821582
conditions := []batchv1.JobCondition{
1583-
{Type: batchv1.JobComplete, Status: corev1.ConditionTrue},
1583+
{Type: batchv1.JobComplete, Status: corev1.ConditionTrue, LastTransitionTime: metav1.Now()},
15841584
}
15851585
job.Status.Conditions = conditions
15861586
Expect(k8sClient.Status().Update(ctx, job)).Should(Succeed())
@@ -1696,7 +1696,7 @@ var _ = Context("RayJob with different submission modes", func() {
16961696

16971697
// Update the submitter Kubernetes Job to Complete.
16981698
conditions := []batchv1.JobCondition{
1699-
{Type: batchv1.JobComplete, Status: corev1.ConditionTrue},
1699+
{Type: batchv1.JobComplete, Status: corev1.ConditionTrue, LastTransitionTime: metav1.Now()},
17001700
}
17011701
job.Status.Conditions = conditions
17021702
Expect(k8sClient.Status().Update(ctx, job)).Should(Succeed())
@@ -1824,7 +1824,7 @@ var _ = Context("RayJob with different submission modes", func() {
18241824

18251825
// Update the submitter Kubernetes Job to Complete.
18261826
conditions := []batchv1.JobCondition{
1827-
{Type: batchv1.JobComplete, Status: corev1.ConditionTrue},
1827+
{Type: batchv1.JobComplete, Status: corev1.ConditionTrue, LastTransitionTime: metav1.Now()},
18281828
}
18291829
job.Status.Conditions = conditions
18301830
Expect(k8sClient.Status().Update(ctx, job)).Should(Succeed())
@@ -1985,7 +1985,7 @@ var _ = Context("RayJob with different submission modes", func() {
19851985

19861986
// Update the submitter Kubernetes Job to Complete.
19871987
conditions := []batchv1.JobCondition{
1988-
{Type: batchv1.JobComplete, Status: corev1.ConditionTrue},
1988+
{Type: batchv1.JobComplete, Status: corev1.ConditionTrue, LastTransitionTime: metav1.Now()},
19891989
}
19901990
job.Status.Conditions = conditions
19911991
Expect(k8sClient.Status().Update(ctx, job)).Should(Succeed())

ray-operator/test/e2erayjob/rayjob_test.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,18 @@ package e2erayjob
33
import (
44
"strings"
55
"testing"
6+
"time"
67

78
. "github.com/onsi/gomega"
9+
"github.com/stretchr/testify/assert"
10+
batchv1 "k8s.io/api/batch/v1"
11+
corev1 "k8s.io/api/core/v1"
812
k8serrors "k8s.io/apimachinery/pkg/api/errors"
913
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1014
"k8s.io/utils/ptr"
1115

1216
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
17+
"github.com/ray-project/kuberay/ray-operator/controllers/ray"
1318
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
1419
rayv1ac "github.com/ray-project/kuberay/ray-operator/pkg/client/applyconfiguration/ray/v1"
1520
. "github.com/ray-project/kuberay/ray-operator/test/support"
@@ -360,4 +365,73 @@ env_vars:
360365
g.Expect(err).NotTo(HaveOccurred())
361366
LogWithTimestamp(test.T(), "Deleted RayJob %s/%s successfully", *rayJobAC.Namespace, *rayJobAC.Name)
362367
})
368+
369+
test.T().Run("RayJob has exceed SubmitterFinishedTimeout", func(_ *testing.T) {
370+
rayJobAC := rayv1ac.RayJob("submitter-timeout", namespace.Name).
371+
WithSpec(rayv1ac.RayJobSpec().
372+
WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))).
373+
WithEntrypoint("python /home/ray/jobs/long_running.py").
374+
WithShutdownAfterJobFinishes(true).
375+
WithSubmitterPodTemplate(JobSubmitterPodTemplateApplyConfiguration()))
376+
377+
rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions)
378+
g.Expect(err).NotTo(HaveOccurred())
379+
LogWithTimestamp(test.T(), "Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name)
380+
381+
// Wait until RayJob status and deployment status is running
382+
LogWithTimestamp(test.T(), "Waiting for RayJob %s/%s deployment status to be 'Running'", rayJob.Namespace, rayJob.Name)
383+
g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium).
384+
Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusRunning)))
385+
LogWithTimestamp(test.T(), "Waiting for Ray job %s/%s to be actually running in Ray cluster", rayJob.Namespace, rayJob.Name)
386+
g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium).
387+
Should(WithTransform(RayJobStatus, Equal(rayv1.JobStatusRunning)))
388+
389+
// Wait for the submitter job to be created
390+
LogWithTimestamp(test.T(), "Waiting for submitter job to be created")
391+
g.Eventually(Jobs(test, namespace.Name)).ShouldNot(BeEmpty())
392+
393+
// Force the submitter job to complete by updating its status
394+
// This simulates the submitter finishing but the Ray job still running
395+
LogWithTimestamp(test.T(), "Updating submitter job status to complete")
396+
job, err := test.Client().Core().BatchV1().Jobs(namespace.Name).Get(test.Ctx(), rayJob.Name, metav1.GetOptions{})
397+
g.Expect(err).NotTo(HaveOccurred())
398+
now := metav1.Now()
399+
job.Status.Conditions = append(job.Status.Conditions, batchv1.JobCondition{
400+
Type: batchv1.JobComplete,
401+
Status: corev1.ConditionTrue,
402+
LastProbeTime: now,
403+
LastTransitionTime: now,
404+
Reason: "Completed",
405+
Message: "Job completed successfully for timeout test",
406+
})
407+
job.Status.CompletionTime = &now
408+
job.Status.Succeeded = 1
409+
410+
_, err = test.Client().Core().BatchV1().Jobs(namespace.Name).UpdateStatus(test.Ctx(), job, metav1.UpdateOptions{})
411+
g.Expect(err).NotTo(HaveOccurred())
412+
LogWithTimestamp(test.T(), "Successfully marked submitter job as completed at %v", now.Time)
413+
414+
// Record the start time for timeout measurement
415+
timeoutStartTime := time.Now()
416+
417+
// Wait for the timeout to trigger
418+
LogWithTimestamp(test.T(), "Waiting for RayJob %s/%s to exceed SubmitterFinishedTimeout", rayJob.Namespace, rayJob.Name)
419+
g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutShort).
420+
Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusFailed)))
421+
422+
// Measure the actual timeout duration and verify the timeout duration is close to DefaultSubmitterFinishedTimeout
423+
actualTimeoutDuration := time.Since(timeoutStartTime)
424+
expectedTimeout := ray.DefaultSubmitterFinishedTimeout
425+
assert.InDelta(test.T(), expectedTimeout.Seconds(), actualTimeoutDuration.Seconds(), 5.0,
426+
"Actual timeout duration should be close to DefaultSubmitterFinishedTimeout")
427+
428+
// Get the updated rayJob
429+
rayJob, err = GetRayJob(test, rayJob.Namespace, rayJob.Name)
430+
g.Expect(err).NotTo(HaveOccurred())
431+
432+
reason := rayJob.Status.Reason
433+
message := rayJob.Status.Message
434+
g.Expect(reason).To(Equal(rayv1.JobDeploymentStatusTransitionGracePeriodExceeded))
435+
g.Expect(message).To(MatchRegexp(`The RayJob submitter finished at .* but the ray job did not reach terminal state within .*`))
436+
})
363437
}

0 commit comments

Comments
 (0)