diff --git a/historyserver/test/e2e/collector_test.go b/historyserver/test/e2e/collector_test.go index cc67b5a211c..9c9c8df2519 100644 --- a/historyserver/test/e2e/collector_test.go +++ b/historyserver/test/e2e/collector_test.go @@ -107,9 +107,11 @@ func testCollectorUploadOnGracefulShutdown(test Test, g *WithT, namespace *corev // Submit a Ray job to the existing cluster. _ = applyRayJobAndWaitForCompletion(test, g, namespace) + // Define variables for constructing S3 object prefix. clusterNameID := fmt.Sprintf("%s_%s", rayCluster.Name, rayClusterID) sessionID := getSessionIDFromHeadPod(test, g, rayCluster) - nodeID := getNodeIDFromHeadPod(test, g, rayCluster) + headNodeID := getNodeIDFromPod(test, g, HeadPod(test, rayCluster), "ray-head") + workerNodeID := getNodeIDFromPod(test, g, FirstWorkerPod(test, rayCluster), "ray-worker") sessionPrefix := fmt.Sprintf("log/%s/%s/", clusterNameID, sessionID) // Delete the Ray cluster to trigger log uploading and event flushing on deletion. @@ -123,7 +125,7 @@ func testCollectorUploadOnGracefulShutdown(test Test, g *WithT, namespace *corev }, TestTimeoutMedium).Should(WithTransform(k8serrors.IsNotFound, BeTrue())) // Verify logs, node_events, and job_events are successfully uploaded to S3. - verifyS3SessionDirs(test, g, s3Client, sessionPrefix, nodeID) + verifyS3SessionDirs(test, g, s3Client, sessionPrefix, headNodeID, workerNodeID) // Delete S3 bucket to ensure test isolation. deleteS3Bucket(test, g, s3Client) @@ -155,7 +157,8 @@ func testCollectorSeparatesFilesBySession(test Test, g *WithT, namespace *corev1 clusterNameID := fmt.Sprintf("%s_%s", rayCluster.Name, rayClusterID) sessionID := getSessionIDFromHeadPod(test, g, rayCluster) - nodeID := getNodeIDFromHeadPod(test, g, rayCluster) + headNodeID := getNodeIDFromPod(test, g, HeadPod(test, rayCluster), "ray-head") + workerNodeID := getNodeIDFromPod(test, g, FirstWorkerPod(test, rayCluster), "ray-worker") sessionPrefix := fmt.Sprintf("log/%s/%s/", clusterNameID, sessionID) // NOTE: We use `kill 1` to simulate Kubernetes OOMKilled behavior. @@ -183,7 +186,7 @@ func testCollectorSeparatesFilesBySession(test Test, g *WithT, namespace *corev1 } // Verify logs, node_events, and job_events are successfully uploaded to S3. - verifyS3SessionDirs(test, g, s3Client, sessionPrefix, nodeID) + verifyS3SessionDirs(test, g, s3Client, sessionPrefix, headNodeID, workerNodeID) deleteS3Bucket(test, g, s3Client) } @@ -499,83 +502,56 @@ func applyRayJobAndWaitForCompletion(test Test, g *WithT, namespace *corev1.Name // verifyS3SessionDirs verifies file contents in logs/, node_events/, and job_events/ directories under a session prefix in S3. // There are two phases of verification: // 1. Verify file contents in logs/ directory -// - logs//raylet.out must exist and have content > 0 bytes -// - TODO(jwj): Complete docs. +// - For the head node, verify raylet.out, gcs_server.out, and monitor.out exist +// - For the worker node, verify raylet.out exists // // 2. Verify event type coverage in node_events/ and job_events/ directories // - Aggregate all events from node_events/ and job_events/ directories // - Verify that all potential event types are present in the aggregated events // // NOTE: Since flushed node and job events are nondeterministic, we need to aggregate them first before verifying event type coverage. -func verifyS3SessionDirs(test Test, g *WithT, s3Client *s3.S3, sessionPrefix string, nodeID string) { - // TODO(jwj): Separate verification for logs and events. - dirs := []string{"logs"} - for _, dir := range dirs { - dirPrefix := sessionPrefix + dir + "/" - - g.Eventually(func(gg Gomega) { - // Verify the directory has at least one object. - objects, err := s3Client.ListObjectsV2(&s3.ListObjectsV2Input{ - Bucket: aws.String(s3BucketName), - Prefix: aws.String(dirPrefix), - MaxKeys: aws.Int64(10), - }) - gg.Expect(err).NotTo(HaveOccurred()) - keyCount := aws.Int64Value(objects.KeyCount) - gg.Expect(keyCount).To(BeNumerically(">", 0)) - LogWithTimestamp(test.T(), "Verified directory %s under %s has at least one object", dir, sessionPrefix) - - // Find the first file object for content verification. - var fileObj *s3.Object - for _, obj := range objects.Contents { - if !strings.HasSuffix(aws.StringValue(obj.Key), "/") { - fileObj = obj - break - } - } - gg.Expect(fileObj).NotTo(BeNil(), "No file object found in directory %s", dirPrefix) - - // Verify the file has content by checking file size. - fileKey := *fileObj.Key - LogWithTimestamp(test.T(), "Checking file: %s", fileKey) - obj, err := s3Client.HeadObject(&s3.HeadObjectInput{ - Bucket: aws.String(s3BucketName), - Key: aws.String(fileKey), - }) - gg.Expect(err).NotTo(HaveOccurred()) - fileSize := aws.Int64Value(obj.ContentLength) - gg.Expect(fileSize).To(BeNumerically(">", 0)) - LogWithTimestamp(test.T(), "Verified file %s has content: %d bytes", fileKey, fileSize) - }, TestTimeoutMedium).Should(Succeed(), "Failed to verify at least one object in directory %s has content", dirPrefix) +func verifyS3SessionDirs(test Test, g *WithT, s3Client *s3.S3, sessionPrefix string, headNodeID string, workerNodeID string) { + // Verify file contents in logs/ directory. + headLogDirPrefix := fmt.Sprintf("%slogs/%s", sessionPrefix, headNodeID) + workerLogDirPrefix := fmt.Sprintf("%slogs/%s", sessionPrefix, workerNodeID) + + LogWithTimestamp(test.T(), "Verifying raylet.out, gcs_server.out, and monitor.out exist in head log directory %s", headLogDirPrefix) + for _, fileName := range []string{"raylet.out", "gcs_server.out", "monitor.out"} { + assertFileExist(test, g, s3Client, headLogDirPrefix, fileName) } + LogWithTimestamp(test.T(), "Verifying raylet.out exists in worker log directory %s", workerLogDirPrefix) + assertFileExist(test, g, s3Client, workerLogDirPrefix, "raylet.out") + // Verify event type coverage in node_events/ and job_events/ directories. LogWithTimestamp(test.T(), "Verifying all %d event types are covered, except for EVENT_TYPE_UNSPECIFIED: %v", len(types.AllEventTypes)-1, types.AllEventTypes) - uploadedEvents := []rayEvent{} + g.Eventually(func(gg Gomega) { + uploadedEvents := []rayEvent{} - // Load events from node_events directory. - nodeEventsPrefix := sessionPrefix + "node_events/" - nodeEvents, err := loadRayEventsFromS3(s3Client, s3BucketName, nodeEventsPrefix) - g.Expect(err).NotTo(HaveOccurred()) - uploadedEvents = append(uploadedEvents, nodeEvents...) - LogWithTimestamp(test.T(), "Loaded %d events from node_events", len(nodeEvents)) + // Load events from node_events directory. + nodeEventsPrefix := sessionPrefix + "node_events/" + nodeEvents, err := loadRayEventsFromS3(s3Client, s3BucketName, nodeEventsPrefix) + gg.Expect(err).NotTo(HaveOccurred()) + uploadedEvents = append(uploadedEvents, nodeEvents...) + LogWithTimestamp(test.T(), "Loaded %d events from node_events", len(nodeEvents)) - // Dynamically discover and load events from job_events directories. - jobEventsPrefix := sessionPrefix + "job_events/" - jobDirs, err := listS3Directories(s3Client, s3BucketName, jobEventsPrefix) - g.Expect(err).NotTo(HaveOccurred()) - g.Expect(jobDirs).NotTo(BeEmpty()) - LogWithTimestamp(test.T(), "Found %d job directories: %v", len(jobDirs), jobDirs) - - for _, jobDir := range jobDirs { - jobDirPrefix := jobEventsPrefix + jobDir + "/" - jobEvents, err := loadRayEventsFromS3(s3Client, s3BucketName, jobDirPrefix) - g.Expect(err).NotTo(HaveOccurred()) - uploadedEvents = append(uploadedEvents, jobEvents...) - LogWithTimestamp(test.T(), "Loaded %d events from job_events/%s", len(jobEvents), jobDir) - } + // Dynamically discover and load events from job_events directories. + jobEventsPrefix := sessionPrefix + "job_events/" + jobDirs, err := listS3Directories(s3Client, s3BucketName, jobEventsPrefix) + gg.Expect(err).NotTo(HaveOccurred()) + gg.Expect(jobDirs).NotTo(BeEmpty()) + LogWithTimestamp(test.T(), "Found %d job directories: %v", len(jobDirs), jobDirs) + + for _, jobDir := range jobDirs { + jobDirPrefix := jobEventsPrefix + jobDir + "/" + jobEvents, err := loadRayEventsFromS3(s3Client, s3BucketName, jobDirPrefix) + gg.Expect(err).NotTo(HaveOccurred()) + uploadedEvents = append(uploadedEvents, jobEvents...) + LogWithTimestamp(test.T(), "Loaded %d events from job_events/%s", len(jobEvents), jobDir) + } - assertAllEventTypesCovered(test, g, uploadedEvents) + assertAllEventTypesCovered(test, gg, uploadedEvents) + }, TestTimeoutMedium).Should(Succeed()) } // getSessionIDFromHeadPod retrieves the sessionID from the Ray head pod by reading the symlink @@ -601,9 +577,9 @@ fi` return sessionID } -// getNodeIDFromHeadPod retrieves the nodeID from the Ray head pod by reading /tmp/ray/raylet_node_id. -func getNodeIDFromHeadPod(test Test, g *WithT, rayCluster *rayv1.RayCluster) string { - headPod, err := GetHeadPod(test, rayCluster) +// getNodeIDFromPod retrieves the nodeID from the Ray head or worker pod by reading /tmp/ray/raylet_node_id. +func getNodeIDFromPod(test Test, g *WithT, getPod func() (*corev1.Pod, error), containerName string) string { + pod, err := getPod() g.Expect(err).NotTo(HaveOccurred()) getNodeIDCmd := `if [ -f "/tmp/ray/raylet_node_id" ]; then @@ -612,7 +588,7 @@ else echo "raylet_node_id not found" exit 1 fi` - output, _ := ExecPodCmd(test, headPod, "ray-head", []string{"sh", "-c", getNodeIDCmd}) + output, _ := ExecPodCmd(test, pod, containerName, []string{"sh", "-c", getNodeIDCmd}) // Parse output to extract the nodeID. nodeID := strings.TrimSpace(output.String()) @@ -742,9 +718,29 @@ func loadRayEventsFromS3(s3Client *s3.S3, bucket string, prefix string) ([]rayEv return events, nil } +// assertFileExist verifies that a file object exists under the given log directory prefix. +// For a Ray cluster with one head node and one worker node, there are two log directories to verify: +// - logs// +// - logs// +func assertFileExist(test Test, g *WithT, s3Client *s3.S3, nodeLogDirPrefix string, fileName string) { + fileKey := fmt.Sprintf("%s/%s", nodeLogDirPrefix, fileName) + LogWithTimestamp(test.T(), "Verifying file %s exists", fileKey) + g.Eventually(func(gg Gomega) { + _, err := s3Client.HeadObject(&s3.HeadObjectInput{ + Bucket: aws.String(s3BucketName), + Key: aws.String(fileKey), + }) + gg.Expect(err).NotTo(HaveOccurred()) + LogWithTimestamp(test.T(), "Verified file %s exists", fileKey) + }, TestTimeoutMedium).Should(Succeed(), "Failed to verify file %s exists", fileKey) +} + // assertAllEventTypesCovered verifies that all potential event types are present in the events uploaded to S3. // NOTE: EVENT_TYPE_UNSPECIFIED is excluded from verification. -func assertAllEventTypesCovered(test Test, g *WithT, events []rayEvent) { +// +// This function accepts Gomega (not *WithT) because it's called from within g.Eventually() callbacks, +// which provide a nested Gomega instance for assertions. +func assertAllEventTypesCovered(test Test, g Gomega, events []rayEvent) { foundEventTypes := map[string]bool{} for _, event := range events { foundEventTypes[event.EventType] = true @@ -752,6 +748,7 @@ func assertAllEventTypesCovered(test Test, g *WithT, events []rayEvent) { for _, eventType := range types.AllEventTypes { if eventType == types.EVENT_TYPE_UNSPECIFIED { + LogWithTimestamp(test.T(), "Skipping verification for EVENT_TYPE_UNSPECIFIED") continue } g.Expect(foundEventTypes[string(eventType)]).To(BeTrue(), "Event type %s not found", eventType)