-
Notifications
You must be signed in to change notification settings - Fork 691
[Test] [history server] [collector] Ensure event type coverage #4343
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 12 commits
b0172bc
eeb7a7f
0ff5bcf
5aea305
5f547a2
0177160
420ce9a
0621ba4
07403f7
de153d1
d1c2b18
14a9b52
598dbfd
9611a42
9f70a21
84519bc
c796bfc
c3f73b1
302d903
5c8fc59
10237ec
0e28fa2
48681ab
5c5c878
6f47105
926aee4
d94d02a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -2,6 +2,7 @@ package e2e | |||
|
|
||||
| import ( | ||||
| "context" | ||||
| "encoding/json" | ||||
| "fmt" | ||||
| "os/exec" | ||||
| "path/filepath" | ||||
|
|
@@ -18,8 +19,6 @@ import ( | |||
| "github.com/aws/aws-sdk-go/aws/session" | ||||
| "github.com/aws/aws-sdk-go/service/s3" | ||||
| rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" | ||||
| "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" | ||||
| rayv1ac "github.com/ray-project/kuberay/ray-operator/pkg/client/applyconfiguration/ray/v1" | ||||
| . "github.com/ray-project/kuberay/ray-operator/test/support" | ||||
| ) | ||||
|
|
||||
|
|
@@ -35,8 +34,33 @@ const ( | |||
| // Ray cluster | ||||
| rayClusterManifestPath = "../../config/raycluster.yaml" | ||||
| rayClusterID = "default" | ||||
|
|
||||
| // Ray job | ||||
| rayJobManifestPath = "../../config/rayjob.yaml" | ||||
| ) | ||||
|
|
||||
| // rayEventTypes includes all potential event types defined in Ray: | ||||
| // https://github.com/ray-project/ray/blob/3b41c97fa90c58b0b72c0026f57005b92310160d/src/ray/protobuf/public/events_base_event.proto#L49-L61 | ||||
| var rayEventTypes = []string{ | ||||
| "ACTOR_DEFINITION_EVENT", | ||||
| "ACTOR_LIFECYCLE_EVENT", | ||||
| "ACTOR_TASK_DEFINITION_EVENT", | ||||
| "DRIVER_JOB_DEFINITION_EVENT", | ||||
| "DRIVER_JOB_LIFECYCLE_EVENT", | ||||
| "TASK_DEFINITION_EVENT", | ||||
| "TASK_LIFECYCLE_EVENT", | ||||
| "TASK_PROFILE_EVENT", | ||||
| "NODE_DEFINITION_EVENT", | ||||
| "NODE_LIFECYCLE_EVENT", | ||||
| } | ||||
|
|
||||
| // rayEvent contains specific fields in the Ray event JSON schema. For now, we keep only two fields, | ||||
| // eventId and eventType while ensuring future extensibility (e.g., sessionName, timestamp, sourceType, etc.). | ||||
| type rayEvent struct { | ||||
| EventID string `json:"eventId"` | ||||
| EventType string `json:"eventType"` | ||||
| } | ||||
|
|
||||
|
Comment on lines
+45
to
+49
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We define this custom type (with specific fields) to represent the decoded Ray event for future extensibility. For example, we might want to do more fine-grained verification after the history server becomes stable. |
||||
| func TestCollector(t *testing.T) { | ||||
| // Share a single S3 client among subtests. | ||||
| s3Client := ensureS3Client(t) | ||||
|
|
@@ -66,20 +90,25 @@ func TestCollector(t *testing.T) { | |||
| } | ||||
| } | ||||
|
|
||||
| // testCollectorUploadOnGracefulShutdown verifies that logs and node_events are successfully uploaded to S3 on cluster deletion. | ||||
| // testCollectorUploadOnGracefulShutdown verifies that logs, node_events, and job_events are successfully uploaded to S3 on cluster deletion. | ||||
| // | ||||
| // The test case follows these steps: | ||||
| // 1. Prepare test environment by applying a Ray cluster with the collector | ||||
| // 2. Submit a Ray job to the existing Ray cluster | ||||
| // 3. Get the sessionID and nodeID for further verification | ||||
| // 4. Delete the Ray cluster to trigger log uploading and event flushing on deletion. When the Ray cluster is deleted, | ||||
| // logs and node_events are processed as follows: | ||||
| // logs, node_events, and job_events are processed as follows: | ||||
| // - logs: Trigger RayLogHandler.processSessionLatestLog to process logs under /tmp/ray/session_latest | ||||
| // - node_events: Trigger EventServer.flushEvents to process in-memory events | ||||
| // - node_events: Trigger EventServer.flushEvents, which calls es.flushNodeEventsForHour to process in-memory node events | ||||
| // - job_events: Trigger EventServer.flushEvents, which calls es.flushJobEventsForHour to process in-memory job events | ||||
| // | ||||
| // 5. Verify logs and node_events are successfully uploaded to S3. Expected S3 path structure: | ||||
| // 5. Verify logs, node_events, and job_events are successfully uploaded to S3. Expected S3 path structure: | ||||
| // - {s3BucketName}/log/{clusterName}_{clusterID}/{sessionID}/logs/... | ||||
| // - {s3BucketName}/log/{clusterName}_{clusterID}/{sessionID}/node_events/... | ||||
| // - {s3BucketName}/log/{clusterName}_{clusterID}/{sessionID}/job_events/AgAAAA==/... | ||||
| // - {s3BucketName}/log/{clusterName}_{clusterID}/{sessionID}/job_events/AQAAAA==/... | ||||
| // | ||||
| // For detailed verification logic, please refer to verifyS3SessionDirs. | ||||
| // | ||||
| // 6. Delete S3 bucket to ensure test isolation | ||||
| func testCollectorUploadOnGracefulShutdown(test Test, g *WithT, namespace *corev1.Namespace, s3Client *s3.S3) { | ||||
|
|
@@ -103,7 +132,7 @@ func testCollectorUploadOnGracefulShutdown(test Test, g *WithT, namespace *corev | |||
| return err | ||||
| }, TestTimeoutMedium).Should(WithTransform(k8serrors.IsNotFound, BeTrue())) | ||||
|
|
||||
| // Verify logs and node_events are successfully uploaded to S3. | ||||
| // Verify logs, node_events, and job_events are successfully uploaded to S3. | ||||
| verifyS3SessionDirs(test, g, s3Client, sessionPrefix, nodeID) | ||||
|
|
||||
| // Delete S3 bucket to ensure test isolation. | ||||
|
|
@@ -145,23 +174,21 @@ func testCollectorSeparatesFilesBySession(test Test, g *WithT, namespace *corev1 | |||
| // Since Kubernetes 1.28 (with cgroups v2 enabled), `memory.oom.group` is enabled by default: when any process in a cgroup | ||||
| // hits the memory limit, all processes in the container are killed together, thereby triggering container restart. | ||||
| // For more details, please refer to https://github.com/kubernetes/kubernetes/pull/117793 | ||||
| LogWithTimestamp(test.T(), "Killing main process of ray-head container to trigger a restart") | ||||
| g.Eventually(func(gg Gomega) { | ||||
| headPod, err := GetHeadPod(test, rayCluster) | ||||
| gg.Expect(err).NotTo(HaveOccurred()) | ||||
| _, stderr := ExecPodCmd(test, headPod, "ray-head", []string{"kill", "1"}) | ||||
| gg.Expect(stderr.String()).To(BeEmpty()) | ||||
| }, TestTimeoutMedium).Should(Succeed(), "Failed to kill main process of ray-head container") | ||||
|
|
||||
| LogWithTimestamp(test.T(), "Waiting for ray-head container to restart and become ready") | ||||
| g.Eventually(func(gg Gomega) { | ||||
| updatedPod, err := GetHeadPod(test, rayCluster) | ||||
| gg.Expect(err).NotTo(HaveOccurred()) | ||||
| rayHeadStatus, err := getContainerStatusByName(updatedPod, "ray-head") | ||||
| gg.Expect(err).NotTo(HaveOccurred()) | ||||
| gg.Expect(rayHeadStatus.RestartCount).To(BeNumerically(">", 0)) | ||||
| gg.Expect(rayHeadStatus.Ready).To(BeTrue()) | ||||
| }, TestTimeoutShort).Should(Succeed(), "ray-head container should restart and become ready") | ||||
| killContainerAndWaitForRestart(test, g, rayCluster, "ray-head", func() (*corev1.Pod, error) { | ||||
| return GetHeadPod(test, rayCluster) | ||||
| }) | ||||
| // TODO(jwj): Clarify the automatic restart mechanism. | ||||
| // Force kill the ray-worker container before automatic restart. | ||||
| killContainerAndWaitForRestart(test, g, rayCluster, "ray-worker", func() (*corev1.Pod, error) { | ||||
| workerPods, err := GetWorkerPods(test, rayCluster) | ||||
| if err != nil { | ||||
| return nil, err | ||||
| } | ||||
| if len(workerPods) == 0 { | ||||
| return nil, fmt.Errorf("no worker pods found") | ||||
| } | ||||
| return &workerPods[0], nil | ||||
| }) | ||||
|
|
||||
| // Verify the old session logs have been processed on disk. | ||||
| dirs := []string{"prev-logs", "persist-complete-logs"} | ||||
|
|
@@ -178,7 +205,7 @@ func testCollectorSeparatesFilesBySession(test Test, g *WithT, namespace *corev1 | |||
| }, TestTimeoutMedium).Should(Succeed(), "Session directory %s should exist in %s", sessionID, dirPath) | ||||
| } | ||||
|
|
||||
| // Verify logs and node_events are successfully uploaded to S3. | ||||
| // Verify logs, node_events, and job_events are successfully uploaded to S3. | ||||
| verifyS3SessionDirs(test, g, s3Client, sessionPrefix, nodeID) | ||||
|
|
||||
| deleteS3Bucket(test, g, s3Client) | ||||
|
|
@@ -347,14 +374,12 @@ func applyRayCluster(test Test, g *WithT, namespace *corev1.Namespace) *rayv1.Ra | |||
|
|
||||
| // applyRayJobToCluster applies a Ray job to the existing Ray cluster. | ||||
| func applyRayJobToCluster(test Test, g *WithT, namespace *corev1.Namespace, rayCluster *rayv1.RayCluster) *rayv1.RayJob { | ||||
| jobScript := "import ray; ray.init(); print(ray.cluster_resources())" | ||||
| rayJobAC := rayv1ac.RayJob("ray-job", namespace.Name). | ||||
| WithSpec(rayv1ac.RayJobSpec(). | ||||
| WithClusterSelector(map[string]string{utils.RayClusterLabelKey: rayCluster.Name}). | ||||
| WithEntrypoint(fmt.Sprintf("python -c %q", jobScript)). | ||||
| WithSubmitterPodTemplate(JobSubmitterPodTemplateApplyConfiguration())) | ||||
|
|
||||
| rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) | ||||
| rayJobFromYaml := DeserializeRayJobYAML(test, rayJobManifestPath) | ||||
| rayJobFromYaml.Namespace = namespace.Name | ||||
|
|
||||
| rayJob, err := test.Client().Ray().RayV1(). | ||||
| RayJobs(namespace.Name). | ||||
| Create(test.Ctx(), rayJobFromYaml, metav1.CreateOptions{}) | ||||
JiangJiaWei1103 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
| g.Expect(err).NotTo(HaveOccurred()) | ||||
| LogWithTimestamp(test.T(), "Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name) | ||||
|
|
||||
|
|
@@ -369,13 +394,20 @@ func applyRayJobToCluster(test Test, g *WithT, namespace *corev1.Namespace, rayC | |||
| return rayJob | ||||
| } | ||||
|
|
||||
| // verifyS3SessionDirs verifies that directories logs/ and node_events/ exist under a session prefix in S3. | ||||
| // This helper function checks that each directory contains at least one object. | ||||
| // Additionally, it verifies that specific files have content: | ||||
| // - logs/<nodeID>/raylet.out must exist and have content > 0 bytes | ||||
| // - node_events/<nodeID>_<suffix> must exist and have content > 0 bytes (suffix can be ignored for verification) | ||||
| // verifyS3SessionDirs verifies file contents in logs/, node_events/, and job_events/ directories under a session prefix in S3. | ||||
| // There are two phases of verification: | ||||
| // 1. Verify file contents in logs/ directory | ||||
| // - logs/<nodeID>/raylet.out must exist and have content > 0 bytes | ||||
| // - TODO(jwj): Complete docs. | ||||
| // | ||||
| // 2. Verify event type coverage in node_events/ and job_events/ directories | ||||
| // - Aggregate all events from node_events/ and job_events/ directories | ||||
| // - Verify that all potential event types are present in the aggregated events | ||||
| // | ||||
| // NOTE: Since flushed node and job events are nondeterministic, we need to aggregate them first before verifying event type coverage. | ||||
| func verifyS3SessionDirs(test Test, g *WithT, s3Client *s3.S3, sessionPrefix string, nodeID string) { | ||||
| dirs := []string{"logs", "node_events"} | ||||
| // TODO(jwj): Separate verification for logs and events. | ||||
JiangJiaWei1103 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
| dirs := []string{"logs"} | ||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it possible to parse the kuberay/historyserver/config/raycluster.yaml Line 101 in 79b5c30
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||
| for _, dir := range dirs { | ||||
| dirPrefix := sessionPrefix + dir + "/" | ||||
|
|
||||
|
|
@@ -414,6 +446,16 @@ func verifyS3SessionDirs(test Test, g *WithT, s3Client *s3.S3, sessionPrefix str | |||
| LogWithTimestamp(test.T(), "Verified file %s has content: %d bytes", fileKey, fileSize) | ||||
| }, TestTimeoutMedium).Should(Succeed(), "Failed to verify at least one object in directory %s has content", dirPrefix) | ||||
| } | ||||
|
|
||||
| LogWithTimestamp(test.T(), "Verifying all %d event types are covered: %v", len(rayEventTypes), rayEventTypes) | ||||
| uploadedEvents := []rayEvent{} | ||||
| for _, dir := range []string{"node_events", "job_events/AgAAAA==", "job_events/AQAAAA=="} { | ||||
| dirPrefix := sessionPrefix + dir + "/" | ||||
| events, err := loadRayEventsFromS3(s3Client, s3BucketName, dirPrefix) | ||||
| g.Expect(err).NotTo(HaveOccurred()) | ||||
| uploadedEvents = append(uploadedEvents, events...) | ||||
| } | ||||
| assertAllEventTypesCovered(test, g, uploadedEvents) | ||||
JiangJiaWei1103 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
| } | ||||
|
|
||||
| // getSessionIDFromHeadPod retrieves the sessionID from the Ray head pod by reading the symlink | ||||
|
|
@@ -461,6 +503,27 @@ fi` | |||
|
|
||||
| } | ||||
|
|
||||
| // killContainerAndWaitForRestart kills the main process of a container and waits for the container to restart and become ready. | ||||
| func killContainerAndWaitForRestart(test Test, g *WithT, rayCluster *rayv1.RayCluster, containerName string, getPod func() (*corev1.Pod, error)) { | ||||
| LogWithTimestamp(test.T(), "Killing main process of %s container to trigger a restart", containerName) | ||||
| g.Eventually(func(gg Gomega) { | ||||
| pod, err := getPod() | ||||
| gg.Expect(err).NotTo(HaveOccurred()) | ||||
| _, stderr := ExecPodCmd(test, pod, containerName, []string{"kill", "1"}) | ||||
| gg.Expect(stderr.String()).To(BeEmpty()) | ||||
| }, TestTimeoutMedium).Should(Succeed(), "Failed to kill main process of %s container", containerName) | ||||
|
|
||||
| LogWithTimestamp(test.T(), "Waiting for %s container to restart and become ready", containerName) | ||||
| g.Eventually(func(gg Gomega) { | ||||
| pod, err := getPod() | ||||
| gg.Expect(err).NotTo(HaveOccurred()) | ||||
| containerStatus, err := getContainerStatusByName(pod, containerName) | ||||
| gg.Expect(err).NotTo(HaveOccurred()) | ||||
| gg.Expect(containerStatus.RestartCount).To(BeNumerically(">", 0)) | ||||
| gg.Expect(containerStatus.Ready).To(BeTrue()) | ||||
| }, TestTimeoutShort).Should(Succeed(), "%s container should restart and become ready", containerName) | ||||
| } | ||||
|
|
||||
| // getContainerStatusByName retrieves the container status by container name. | ||||
| // NOTE: ContainerStatuses order doesn't guarantee to match Spec.Containers order. | ||||
| // For more details, please refer to the following link: | ||||
|
|
@@ -473,3 +536,54 @@ func getContainerStatusByName(pod *corev1.Pod, containerName string) (*corev1.Co | |||
| } | ||||
| return nil, fmt.Errorf("container %s not found in pod %s/%s", containerName, pod.Namespace, pod.Name) | ||||
| } | ||||
|
|
||||
| // loadRayEventsFromS3 loads Ray events from S3. | ||||
| func loadRayEventsFromS3(s3Client *s3.S3, bucket string, prefix string) ([]rayEvent, error) { | ||||
| objects, err := s3Client.ListObjectsV2(&s3.ListObjectsV2Input{ | ||||
| Bucket: aws.String(bucket), | ||||
| Prefix: aws.String(prefix), | ||||
| }) | ||||
| if err != nil { | ||||
| return nil, err | ||||
| } | ||||
|
|
||||
| // Find the first file object for loading Ray events. | ||||
| var fileKey string | ||||
| for _, obj := range objects.Contents { | ||||
| if key := aws.StringValue(obj.Key); !strings.HasSuffix(key, "/") { | ||||
| fileKey = key | ||||
| break | ||||
| } | ||||
| } | ||||
cursor[bot] marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||
| if fileKey == "" { | ||||
| return nil, fmt.Errorf("no file object found in directory %s", prefix) | ||||
| } | ||||
|
|
||||
| // Get and decode the file object content into Ray events. | ||||
| content, err := s3Client.GetObject(&s3.GetObjectInput{ | ||||
| Bucket: aws.String(bucket), | ||||
| Key: aws.String(fileKey), | ||||
| }) | ||||
| if err != nil { | ||||
| return nil, err | ||||
| } | ||||
| defer content.Body.Close() | ||||
|
|
||||
| var events []rayEvent | ||||
| if err := json.NewDecoder(content.Body).Decode(&events); err != nil { | ||||
| return nil, err | ||||
| } | ||||
| return events, nil | ||||
| } | ||||
|
|
||||
| // assertAllEventTypesCovered verifies that all potential event types are present in the events uploaded to S3. | ||||
| func assertAllEventTypesCovered(test Test, g *WithT, events []rayEvent) { | ||||
| foundEventTypes := map[string]bool{} | ||||
| for _, event := range events { | ||||
| foundEventTypes[event.EventType] = true | ||||
| } | ||||
|
|
||||
| for _, eventType := range rayEventTypes { | ||||
| g.Expect(foundEventTypes[eventType]).To(BeTrue(), "Event type %s not found", eventType) | ||||
| } | ||||
| } | ||||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we define in the event.go? Similar to how AllJobStatuses is defined here:
kuberay/ray-operator/apis/ray/v1/rayjob_types.go
Lines 17 to 33 in 910223a
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @win5923, thanks for reviewing! Fixed at d94d02a.