Skip to content

Commit b7b4f1a

Browse files
committed
[kubectl-plugin] Refactor job_submit
Signed-off-by: win5923 <[email protected]>
1 parent c30fae2 commit b7b4f1a

File tree

1 file changed

+45
-60
lines changed

1 file changed

+45
-60
lines changed

kubectl-plugin/pkg/cmd/job/job_submit.go

Lines changed: 45 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616

1717
"github.com/google/shlex"
1818
"github.com/spf13/cobra"
19-
"k8s.io/apimachinery/pkg/api/meta"
2019
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2120
"k8s.io/cli-runtime/pkg/genericclioptions"
2221
"k8s.io/cli-runtime/pkg/genericiooptions"
@@ -36,6 +35,8 @@ const (
3635
dashboardAddr = "http://localhost:8265"
3736
clusterTimeout = 120.0
3837
portForwardTimeout = 60.0
38+
pollInterval = 2
39+
httpTimeout = 5
3940
)
4041

4142
type SubmitJobOptions struct {
@@ -392,40 +393,19 @@ func (options *SubmitJobOptions) Run(ctx context.Context, factory cmdutil.Factor
392393
if err != nil {
393394
return fmt.Errorf("Failed to get Ray Job status")
394395
}
395-
time.Sleep(2 * time.Second)
396+
time.Sleep(pollInterval * time.Second)
396397
}
397398
options.cluster = options.RayJob.Status.RayClusterName
398399
} else {
399400
return fmt.Errorf("Unknown cluster and did not provide Ray Job. One of the fields must be set")
400401
}
401402

402-
// Wait til the cluster is ready
403-
var clusterReady bool
404-
clusterWaitStartTime := time.Now()
405-
currTime := clusterWaitStartTime
406-
fmt.Printf("Waiting for RayCluster\n")
407-
fmt.Printf("Checking Cluster Status for cluster %s...\n", options.cluster)
408-
for !clusterReady && currTime.Sub(clusterWaitStartTime).Seconds() <= clusterTimeout {
409-
time.Sleep(2 * time.Second)
410-
currCluster, err := k8sClients.RayClient().RayV1().RayClusters(options.namespace).Get(ctx, options.cluster, v1.GetOptions{})
411-
if err != nil {
412-
return fmt.Errorf("Failed to get cluster information with error: %w", err)
413-
}
414-
clusterReady = isRayClusterReady(currCluster)
415-
if !clusterReady {
416-
fmt.Println("Cluster is not ready")
403+
// Wait until the RayCluster with type=RayClusterProvisioned and status=true
404+
fmt.Printf("Waiting for RayCluster %s to be ready...\n", options.cluster)
405+
if err := k8sClients.WaitRayClusterProvisioned(ctx, options.namespace, options.cluster, time.Duration(clusterTimeout)*time.Second); err != nil {
406+
if cleanupErr := options.cleanupRayJob(ctx, k8sClients); cleanupErr != nil {
407+
return fmt.Errorf("Failed to clean up Ray job after timeout: %w (original error: %w)", cleanupErr, err)
417408
}
418-
currTime = time.Now()
419-
}
420-
421-
if !clusterReady {
422-
fmt.Printf("Deleting RayJob...\n")
423-
err = k8sClients.RayClient().RayV1().RayJobs(options.namespace).Delete(ctx, options.RayJob.GetName(), v1.DeleteOptions{})
424-
if err != nil {
425-
return fmt.Errorf("Failed to clean up Ray job after time out.: %w", err)
426-
}
427-
fmt.Printf("Cleaned Up RayJob: %s\n", options.RayJob.GetName())
428-
429409
return fmt.Errorf("Timed out waiting for cluster")
430410
}
431411

@@ -450,35 +430,9 @@ func (options *SubmitJobOptions) Run(ctx context.Context, factory cmdutil.Factor
450430
}()
451431

452432
// Wait for port forward to be ready
453-
var portForwardReady bool
454-
portForwardWaitStartTime := time.Now()
455-
currTime = portForwardWaitStartTime
456-
457-
portforwardCheckRequest, err := http.NewRequestWithContext(ctx, http.MethodGet, dashboardAddr, nil)
458-
if err != nil {
459-
return fmt.Errorf("Error occurred when trying to create request to probe cluster endpoint: %w", err)
460-
}
461-
httpClient := http.Client{
462-
Timeout: 5 * time.Second,
463-
}
464433
fmt.Printf("Waiting for port forwarding...")
465-
for !portForwardReady && currTime.Sub(portForwardWaitStartTime).Seconds() <= portForwardTimeout {
466-
time.Sleep(2 * time.Second)
467-
rayDashboardResponse, err := httpClient.Do(portforwardCheckRequest)
468-
if err != nil {
469-
err = fmt.Errorf("Error occurred when waiting for port forwarding: %w", err)
470-
fmt.Println(err)
471-
currTime = time.Now()
472-
continue
473-
}
474-
if rayDashboardResponse.StatusCode >= 200 && rayDashboardResponse.StatusCode < 300 {
475-
portForwardReady = true
476-
}
477-
rayDashboardResponse.Body.Close()
478-
currTime = time.Now()
479-
}
480-
if !portForwardReady {
481-
return fmt.Errorf("Timed out waiting for port forwarding")
434+
if err := waitForPortForward(ctx); err != nil {
435+
return fmt.Errorf("Failed to establish port forwarding: %w", err)
482436
}
483437
options.address = dashboardAddr
484438
fmt.Printf("Port forwarding started on %s\n", options.address)
@@ -722,10 +676,6 @@ func runtimeEnvHasWorkingDir(runtimePath string) (string, error) {
722676
return "", nil
723677
}
724678

725-
func isRayClusterReady(rayCluster *rayv1.RayCluster) bool {
726-
return meta.IsStatusConditionTrue(rayCluster.Status.Conditions, "Ready") || rayCluster.Status.State == rayv1.Ready
727-
}
728-
729679
// Generates a 16-character random ID with a prefix, mimicking Ray Job submission_id.
730680
// ref: ray/python/ray/dashboard/modules/job/job_manager.py
731681
func generateSubmissionID() (string, error) {
@@ -743,3 +693,38 @@ func generateSubmissionID() (string, error) {
743693
}
744694
return fmt.Sprintf("raysubmit_%s", string(idRunes)), nil
745695
}
696+
697+
// waitForPortForward waits for port forwarding to be ready
698+
func waitForPortForward(ctx context.Context) error {
699+
httpClient := http.Client{Timeout: httpTimeout * time.Second}
700+
portforwardCheckRequest, err := http.NewRequestWithContext(ctx, http.MethodGet, dashboardAddr, nil)
701+
if err != nil {
702+
return fmt.Errorf("Error occurred when trying to create request to probe cluster endpoint: %w", err)
703+
}
704+
705+
startTime := time.Now()
706+
for time.Since(startTime).Seconds() <= portForwardTimeout {
707+
time.Sleep(pollInterval * time.Second)
708+
rayDashboardResponse, err := httpClient.Do(portforwardCheckRequest)
709+
if err != nil {
710+
fmt.Printf("Error occurred when waiting for port forwarding: %v\n", err)
711+
continue
712+
}
713+
if rayDashboardResponse.StatusCode >= 200 && rayDashboardResponse.StatusCode < 300 {
714+
rayDashboardResponse.Body.Close()
715+
return nil
716+
}
717+
rayDashboardResponse.Body.Close()
718+
}
719+
return fmt.Errorf("Timed out waiting for port forwarding")
720+
}
721+
722+
func (options *SubmitJobOptions) cleanupRayJob(ctx context.Context, k8sClients client.Client) error {
723+
fmt.Printf("Deleting RayJob...\n")
724+
err := k8sClients.RayClient().RayV1().RayJobs(options.namespace).Delete(ctx, options.RayJob.GetName(), v1.DeleteOptions{})
725+
if err != nil {
726+
return fmt.Errorf("Failed to clean up Ray job: %w", err)
727+
}
728+
fmt.Printf("Cleaned Up RayJob: %s\n", options.RayJob.GetName())
729+
return nil
730+
}

0 commit comments

Comments
 (0)