diff --git a/historyserver/config/rayjob.yaml b/historyserver/config/rayjob.yaml index 6741b0ee898..3ce3619364a 100644 --- a/historyserver/config/rayjob.yaml +++ b/historyserver/config/rayjob.yaml @@ -6,6 +6,7 @@ spec: entrypoint: | python -c " import ray + import time ray.init() @ray.remote(num_cpus=0.5) @@ -35,6 +36,10 @@ spec: final_count = ray.get(counter.get_count.remote()) print(f'Final count: {final_count}') print(f'Cluster resources: {ray.cluster_resources()}') + + # For now, events on the worker nodes aren't sent to the collector when calling ray.shutdown(). + # As a workaround, we explicitly wait for 5 seconds to ensure events are sent. + time.sleep(5) " # Select the existing Ray cluster running the collector. clusterSelector: diff --git a/historyserver/pkg/eventserver/types/event.go b/historyserver/pkg/eventserver/types/event.go index 257ed3373cf..e38616fdedb 100644 --- a/historyserver/pkg/eventserver/types/event.go +++ b/historyserver/pkg/eventserver/types/event.go @@ -1,7 +1,10 @@ package types +// EventType is the Ray event type. type EventType string +// There are 11 potential Ray event types: +// https://github.com/ray-project/ray/blob/3b41c97fa90c58b0b72c0026f57005b92310160d/src/ray/protobuf/public/events_base_event.proto#L49-L61 const ( EVENT_TYPE_UNSPECIFIED EventType = "EVENT_TYPE_UNSPECIFIED" TASK_DEFINITION_EVENT EventType = "TASK_DEFINITION_EVENT" @@ -15,3 +18,18 @@ const ( ACTOR_DEFINITION_EVENT EventType = "ACTOR_DEFINITION_EVENT" ACTOR_LIFECYCLE_EVENT EventType = "ACTOR_LIFECYCLE_EVENT" ) + +// AllEventTypes includes all potential event types defined in Ray. +var AllEventTypes = []EventType{ + EVENT_TYPE_UNSPECIFIED, + TASK_DEFINITION_EVENT, + TASK_LIFECYCLE_EVENT, + ACTOR_TASK_DEFINITION_EVENT, + TASK_PROFILE_EVENT, + DRIVER_JOB_DEFINITION_EVENT, + DRIVER_JOB_LIFECYCLE_EVENT, + NODE_DEFINITION_EVENT, + NODE_LIFECYCLE_EVENT, + ACTOR_DEFINITION_EVENT, + ACTOR_LIFECYCLE_EVENT, +} diff --git a/historyserver/test/e2e/collector_test.go b/historyserver/test/e2e/collector_test.go index 0139fbd04bf..cc67b5a211c 100644 --- a/historyserver/test/e2e/collector_test.go +++ b/historyserver/test/e2e/collector_test.go @@ -2,6 +2,7 @@ package e2e import ( "context" + "encoding/json" "fmt" "os/exec" "path/filepath" @@ -17,9 +18,8 @@ import ( "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" + "github.com/ray-project/kuberay/historyserver/pkg/eventserver/types" rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" - "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" - rayv1ac "github.com/ray-project/kuberay/ray-operator/pkg/client/applyconfiguration/ray/v1" . "github.com/ray-project/kuberay/ray-operator/test/support" ) @@ -35,8 +35,18 @@ const ( // Ray cluster rayClusterManifestPath = "../../config/raycluster.yaml" rayClusterID = "default" + + // Ray job + rayJobManifestPath = "../../config/rayjob.yaml" ) +// rayEvent contains specific fields in the Ray event JSON schema. For now, we keep only two fields, +// eventId and eventType while ensuring future extensibility (e.g., sessionName, timestamp, sourceType, etc.). +type rayEvent struct { + EventID string `json:"eventId"` + EventType string `json:"eventType"` +} + func TestCollector(t *testing.T) { // Share a single S3 client among subtests. s3Client := ensureS3Client(t) @@ -70,27 +80,32 @@ func TestCollector(t *testing.T) { } } -// testCollectorUploadOnGracefulShutdown verifies that logs and node_events are successfully uploaded to S3 on cluster deletion. +// testCollectorUploadOnGracefulShutdown verifies that logs, node_events, and job_events are successfully uploaded to S3 on cluster deletion. // // The test case follows these steps: // 1. Prepare test environment by applying a Ray cluster with the collector // 2. Submit a Ray job to the existing Ray cluster // 3. Get the sessionID and nodeID for further verification // 4. Delete the Ray cluster to trigger log uploading and event flushing on deletion. When the Ray cluster is deleted, -// logs and node_events are processed as follows: +// logs, node_events, and job_events are processed as follows: // - logs: Trigger RayLogHandler.processSessionLatestLog to process logs under /tmp/ray/session_latest -// - node_events: Trigger EventServer.flushEvents to process in-memory events +// - node_events: Trigger EventServer.flushEvents, which calls es.flushNodeEventsForHour to process in-memory node events +// - job_events: Trigger EventServer.flushEvents, which calls es.flushJobEventsForHour to process in-memory job events // -// 5. Verify logs and node_events are successfully uploaded to S3. Expected S3 path structure: +// 5. Verify logs, node_events, and job_events are successfully uploaded to S3. Expected S3 path structure: // - {s3BucketName}/log/{clusterName}_{clusterID}/{sessionID}/logs/... // - {s3BucketName}/log/{clusterName}_{clusterID}/{sessionID}/node_events/... +// - {s3BucketName}/log/{clusterName}_{clusterID}/{sessionID}/job_events/AgAAAA==/... +// - {s3BucketName}/log/{clusterName}_{clusterID}/{sessionID}/job_events/AQAAAA==/... +// +// For detailed verification logic, please refer to verifyS3SessionDirs. // // 6. Delete S3 bucket to ensure test isolation func testCollectorUploadOnGracefulShutdown(test Test, g *WithT, namespace *corev1.Namespace, s3Client *s3.S3) { rayCluster := prepareTestEnv(test, g, namespace, s3Client) // Submit a Ray job to the existing cluster. - _ = applyRayJobToCluster(test, g, namespace, rayCluster) + _ = applyRayJobAndWaitForCompletion(test, g, namespace) clusterNameID := fmt.Sprintf("%s_%s", rayCluster.Name, rayClusterID) sessionID := getSessionIDFromHeadPod(test, g, rayCluster) @@ -107,8 +122,8 @@ func testCollectorUploadOnGracefulShutdown(test Test, g *WithT, namespace *corev return err }, TestTimeoutMedium).Should(WithTransform(k8serrors.IsNotFound, BeTrue())) - // Verify logs and node_events are successfully uploaded to S3. - verifyS3SessionDirs(test, g, s3Client, sessionPrefix, nodeID, false) + // Verify logs, node_events, and job_events are successfully uploaded to S3. + verifyS3SessionDirs(test, g, s3Client, sessionPrefix, nodeID) // Delete S3 bucket to ensure test isolation. deleteS3Bucket(test, g, s3Client) @@ -136,7 +151,7 @@ func testCollectorSeparatesFilesBySession(test Test, g *WithT, namespace *corev1 rayCluster := prepareTestEnv(test, g, namespace, s3Client) // Submit a Ray job to the existing cluster. - _ = applyRayJobToCluster(test, g, namespace, rayCluster) + _ = applyRayJobAndWaitForCompletion(test, g, namespace) clusterNameID := fmt.Sprintf("%s_%s", rayCluster.Name, rayClusterID) sessionID := getSessionIDFromHeadPod(test, g, rayCluster) @@ -149,23 +164,8 @@ func testCollectorSeparatesFilesBySession(test Test, g *WithT, namespace *corev1 // Since Kubernetes 1.28 (with cgroups v2 enabled), `memory.oom.group` is enabled by default: when any process in a cgroup // hits the memory limit, all processes in the container are killed together, thereby triggering container restart. // For more details, please refer to https://github.com/kubernetes/kubernetes/pull/117793 - LogWithTimestamp(test.T(), "Killing main process of ray-head container to trigger a restart") - g.Eventually(func(gg Gomega) { - headPod, err := GetHeadPod(test, rayCluster) - gg.Expect(err).NotTo(HaveOccurred()) - _, stderr := ExecPodCmd(test, headPod, "ray-head", []string{"kill", "1"}) - gg.Expect(stderr.String()).To(BeEmpty()) - }, TestTimeoutMedium).Should(Succeed(), "Failed to kill main process of ray-head container") - - LogWithTimestamp(test.T(), "Waiting for ray-head container to restart and become ready") - g.Eventually(func(gg Gomega) { - updatedPod, err := GetHeadPod(test, rayCluster) - gg.Expect(err).NotTo(HaveOccurred()) - rayHeadStatus, err := getContainerStatusByName(updatedPod, "ray-head") - gg.Expect(err).NotTo(HaveOccurred()) - gg.Expect(rayHeadStatus.RestartCount).To(BeNumerically(">", 0)) - gg.Expect(rayHeadStatus.Ready).To(BeTrue()) - }, TestTimeoutShort).Should(Succeed(), "ray-head container should restart and become ready") + killContainerAndWaitForRestart(test, g, HeadPod(test, rayCluster), "ray-head") + killContainerAndWaitForRestart(test, g, FirstWorkerPod(test, rayCluster), "ray-worker") // Verify the old session logs have been processed on disk. dirs := []string{"prev-logs", "persist-complete-logs"} @@ -182,8 +182,8 @@ func testCollectorSeparatesFilesBySession(test Test, g *WithT, namespace *corev1 }, TestTimeoutMedium).Should(Succeed(), "Session directory %s should exist in %s", sessionID, dirPath) } - // Verify logs and node_events are successfully uploaded to S3. - verifyS3SessionDirs(test, g, s3Client, sessionPrefix, nodeID, false) + // Verify logs, node_events, and job_events are successfully uploaded to S3. + verifyS3SessionDirs(test, g, s3Client, sessionPrefix, nodeID) deleteS3Bucket(test, g, s3Client) } @@ -254,10 +254,6 @@ func testCollectorResumesUploadsOnRestart(test Test, g *WithT, namespace *corev1 gg.Expect(cs.Ready).To(BeTrue()) }, TestTimeoutMedium).Should(Succeed()) - // Verify S3 uploads using the existing verifyS3SessionDirs helper. - // Skip node_events verification since prev-logs processing only handles logs directory. - LogWithTimestamp(test.T(), "Verifying scanning logic: checking S3 for recovered files") - // Verify that file2.log was actually uploaded to S3. // file1.log should NOT be uploaded because it was already marked as "completed" in persist-complete-logs. // file2.log should be uploaded because it was in prev-logs (pending upload). @@ -477,16 +473,15 @@ func applyRayCluster(test Test, g *WithT, namespace *corev1.Namespace) *rayv1.Ra return rayCluster } -// applyRayJobToCluster applies a Ray job to the existing Ray cluster. -func applyRayJobToCluster(test Test, g *WithT, namespace *corev1.Namespace, rayCluster *rayv1.RayCluster) *rayv1.RayJob { - jobScript := "import ray; ray.init(); print(ray.cluster_resources())" - rayJobAC := rayv1ac.RayJob("ray-job", namespace.Name). - WithSpec(rayv1ac.RayJobSpec(). - WithClusterSelector(map[string]string{utils.RayClusterLabelKey: rayCluster.Name}). - WithEntrypoint(fmt.Sprintf("python -c %q", jobScript)). - WithSubmitterPodTemplate(JobSubmitterPodTemplateApplyConfiguration())) +// applyRayJobAndWaitForCompletion applies a Ray job to the existing Ray cluster and waits for it to complete successfully. +// In the RayJob manifest, the clusterSelector is set to the existing Ray cluster, raycluster-historyserver. +func applyRayJobAndWaitForCompletion(test Test, g *WithT, namespace *corev1.Namespace) *rayv1.RayJob { + rayJobFromYaml := DeserializeRayJobYAML(test, rayJobManifestPath) + rayJobFromYaml.Namespace = namespace.Name - rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) + rayJob, err := test.Client().Ray().RayV1(). + RayJobs(namespace.Name). + Create(test.Ctx(), rayJobFromYaml, metav1.CreateOptions{}) g.Expect(err).NotTo(HaveOccurred()) LogWithTimestamp(test.T(), "Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name) @@ -501,17 +496,20 @@ func applyRayJobToCluster(test Test, g *WithT, namespace *corev1.Namespace, rayC return rayJob } -// verifyS3SessionDirs verifies that directories logs/ and node_events/ exist under a session prefix in S3. -// This helper function checks that each directory contains at least one object. -// Additionally, it verifies that specific files have content: -// - logs//raylet.out must exist and have content > 0 bytes -// - node_events/_ must exist and have content > 0 bytes (suffix can be ignored for verification) -// If skipNodeEvents is true, node_events directory verification will be skipped. -func verifyS3SessionDirs(test Test, g *WithT, s3Client *s3.S3, sessionPrefix string, nodeID string, skipNodeEvents bool) { +// verifyS3SessionDirs verifies file contents in logs/, node_events/, and job_events/ directories under a session prefix in S3. +// There are two phases of verification: +// 1. Verify file contents in logs/ directory +// - logs//raylet.out must exist and have content > 0 bytes +// - TODO(jwj): Complete docs. +// +// 2. Verify event type coverage in node_events/ and job_events/ directories +// - Aggregate all events from node_events/ and job_events/ directories +// - Verify that all potential event types are present in the aggregated events +// +// NOTE: Since flushed node and job events are nondeterministic, we need to aggregate them first before verifying event type coverage. +func verifyS3SessionDirs(test Test, g *WithT, s3Client *s3.S3, sessionPrefix string, nodeID string) { + // TODO(jwj): Separate verification for logs and events. dirs := []string{"logs"} - if !skipNodeEvents { - dirs = append(dirs, "node_events") - } for _, dir := range dirs { dirPrefix := sessionPrefix + dir + "/" @@ -550,6 +548,34 @@ func verifyS3SessionDirs(test Test, g *WithT, s3Client *s3.S3, sessionPrefix str LogWithTimestamp(test.T(), "Verified file %s has content: %d bytes", fileKey, fileSize) }, TestTimeoutMedium).Should(Succeed(), "Failed to verify at least one object in directory %s has content", dirPrefix) } + + // Verify event type coverage in node_events/ and job_events/ directories. + LogWithTimestamp(test.T(), "Verifying all %d event types are covered, except for EVENT_TYPE_UNSPECIFIED: %v", len(types.AllEventTypes)-1, types.AllEventTypes) + uploadedEvents := []rayEvent{} + + // Load events from node_events directory. + nodeEventsPrefix := sessionPrefix + "node_events/" + nodeEvents, err := loadRayEventsFromS3(s3Client, s3BucketName, nodeEventsPrefix) + g.Expect(err).NotTo(HaveOccurred()) + uploadedEvents = append(uploadedEvents, nodeEvents...) + LogWithTimestamp(test.T(), "Loaded %d events from node_events", len(nodeEvents)) + + // Dynamically discover and load events from job_events directories. + jobEventsPrefix := sessionPrefix + "job_events/" + jobDirs, err := listS3Directories(s3Client, s3BucketName, jobEventsPrefix) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(jobDirs).NotTo(BeEmpty()) + LogWithTimestamp(test.T(), "Found %d job directories: %v", len(jobDirs), jobDirs) + + for _, jobDir := range jobDirs { + jobDirPrefix := jobEventsPrefix + jobDir + "/" + jobEvents, err := loadRayEventsFromS3(s3Client, s3BucketName, jobDirPrefix) + g.Expect(err).NotTo(HaveOccurred()) + uploadedEvents = append(uploadedEvents, jobEvents...) + LogWithTimestamp(test.T(), "Loaded %d events from job_events/%s", len(jobEvents), jobDir) + } + + assertAllEventTypesCovered(test, g, uploadedEvents) } // getSessionIDFromHeadPod retrieves the sessionID from the Ray head pod by reading the symlink @@ -594,7 +620,42 @@ fi` g.Expect(nodeID).NotTo(BeEmpty(), "nodeID should not be empty") return nodeID +} +// FirstWorkerPod returns a function that gets the first worker pod from the Ray cluster. +// It adapts the WorkerPods function to be used with functions expecting a single pod. +func FirstWorkerPod(test Test, rayCluster *rayv1.RayCluster) func() (*corev1.Pod, error) { + return func() (*corev1.Pod, error) { + workerPods, err := GetWorkerPods(test, rayCluster) + if err != nil { + return nil, err + } + if len(workerPods) == 0 { + return nil, fmt.Errorf("no worker pods found") + } + return &workerPods[0], nil + } +} + +// killContainerAndWaitForRestart kills the main process of a container and waits for the container to restart and become ready. +func killContainerAndWaitForRestart(test Test, g *WithT, getPod func() (*corev1.Pod, error), containerName string) { + LogWithTimestamp(test.T(), "Killing main process of %s container to trigger a restart", containerName) + g.Eventually(func(gg Gomega) { + pod, err := getPod() + gg.Expect(err).NotTo(HaveOccurred()) + _, stderr := ExecPodCmd(test, pod, containerName, []string{"kill", "1"}) + gg.Expect(stderr.String()).To(BeEmpty()) + }, TestTimeoutMedium).Should(Succeed(), "Failed to kill main process of %s container", containerName) + + LogWithTimestamp(test.T(), "Waiting for %s container to restart and become ready", containerName) + g.Eventually(func(gg Gomega) { + pod, err := getPod() + gg.Expect(err).NotTo(HaveOccurred()) + containerStatus, err := getContainerStatusByName(pod, containerName) + gg.Expect(err).NotTo(HaveOccurred()) + gg.Expect(containerStatus.RestartCount).To(BeNumerically(">", 0)) + gg.Expect(containerStatus.Ready).To(BeTrue()) + }, TestTimeoutShort).Should(Succeed(), "%s container should restart and become ready", containerName) } // getContainerStatusByName retrieves the container status by container name. @@ -609,3 +670,90 @@ func getContainerStatusByName(pod *corev1.Pod, containerName string) (*corev1.Co } return nil, fmt.Errorf("container %s not found in pod %s/%s", containerName, pod.Namespace, pod.Name) } + +// listS3Directories lists all directories (prefixes) under the given S3 prefix. +// In S3, directories are simulated using prefixes and delimiters. +// For example, given prefix "log/cluster/session/job_events/", this function returns ["AgAAAA==", "AQAAAA=="] +// which are the jobID directories under job_events/. +func listS3Directories(s3Client *s3.S3, bucket string, prefix string) ([]string, error) { + result, err := s3Client.ListObjectsV2(&s3.ListObjectsV2Input{ + Bucket: aws.String(bucket), + Prefix: aws.String(prefix), + Delimiter: aws.String("/"), + }) + if err != nil { + return nil, fmt.Errorf("failed to list S3 directories under %s: %w", prefix, err) + } + + // Extract directory names from CommonPrefixes. + var directories []string + for _, commonPrefix := range result.CommonPrefixes { + fullPrefix := aws.StringValue(commonPrefix.Prefix) + // Extract the directory name by removing the parent prefix and trailing slash. + // Example: "log/cluster/session/job_events/AgAAAA==/" -> "AgAAAA==" + dirName := strings.TrimPrefix(fullPrefix, prefix) + dirName = strings.TrimSuffix(dirName, "/") + if dirName != "" { + directories = append(directories, dirName) + } + } + + return directories, nil +} + +// loadRayEventsFromS3 loads Ray events from S3. +func loadRayEventsFromS3(s3Client *s3.S3, bucket string, prefix string) ([]rayEvent, error) { + var events []rayEvent + + // List all file objects in the directory. + objects, err := s3Client.ListObjectsV2(&s3.ListObjectsV2Input{ + Bucket: aws.String(bucket), + Prefix: aws.String(prefix), + }) + if err != nil { + return nil, err + } + + for _, obj := range objects.Contents { + fileKey := aws.StringValue(obj.Key) + if strings.HasSuffix(fileKey, "/") { + continue + } + + // Get the file object content and decode it into Ray events. + content, err := s3Client.GetObject(&s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(fileKey), + }) + if err != nil { + return nil, err + } + + var fileEvents []rayEvent + if err := json.NewDecoder(content.Body).Decode(&fileEvents); err != nil { + content.Body.Close() + return nil, fmt.Errorf("failed to decode Ray events from %s: %w", fileKey, err) + } + content.Body.Close() + + events = append(events, fileEvents...) + } + + return events, nil +} + +// assertAllEventTypesCovered verifies that all potential event types are present in the events uploaded to S3. +// NOTE: EVENT_TYPE_UNSPECIFIED is excluded from verification. +func assertAllEventTypesCovered(test Test, g *WithT, events []rayEvent) { + foundEventTypes := map[string]bool{} + for _, event := range events { + foundEventTypes[event.EventType] = true + } + + for _, eventType := range types.AllEventTypes { + if eventType == types.EVENT_TYPE_UNSPECIFIED { + continue + } + g.Expect(foundEventTypes[string(eventType)]).To(BeTrue(), "Event type %s not found", eventType) + } +}