-
Notifications
You must be signed in to change notification settings - Fork 692
[Test] [historyserver] [collector] Enhance log file coverage test for both head and worker pods #4351
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
rueian
merged 34 commits into
ray-project:master
from
JiangJiaWei1103:epic-4274/e2e-test-head-worker-logs
Jan 20, 2026
Merged
[Test] [historyserver] [collector] Enhance log file coverage test for both head and worker pods #4351
Changes from 16 commits
Commits
Show all changes
34 commits
Select commit
Hold shift + click to select a range
b0172bc
[historyserver] Fix getJobID for job event collection
Future-Outlier eeb7a7f
add jia-wei as co-author, since he debug with me together
Future-Outlier 0ff5bcf
remove unused code
Future-Outlier 5aea305
update rueian's advice
Future-Outlier 5f547a2
add task profile event example
Future-Outlier 0177160
revert back oneof solution
Future-Outlier 420ce9a
add task profile event
Future-Outlier 0621ba4
test: Test event type coverage in happy path
JiangJiaWei1103 07403f7
Merge branch 'my-master' into epic-4274/e2e-test-coverage-of-event-types
JiangJiaWei1103 de153d1
refactor: Remove redundant code
JiangJiaWei1103 d1c2b18
test: Verify event type coverage of aggregated node and job events
JiangJiaWei1103 14a9b52
test: Force kill worker container and verify event coverage
JiangJiaWei1103 598dbfd
refactor: Create an WorkerPods adapter and remove redundancy
JiangJiaWei1103 7492761
test: Check both head and worker logs
JiangJiaWei1103 9611a42
refactor: Use eventually to wrap coverage check
JiangJiaWei1103 86e73da
test: Check raylet.out and gcs_server.out
JiangJiaWei1103 ecf3648
docs: Correct docs
JiangJiaWei1103 9f70a21
refactor: List subdirs of job_events rather than hardcoding
JiangJiaWei1103 84519bc
fix: Wait for async job events flushing on worker
JiangJiaWei1103 c796bfc
test: Consolidate tests by checking non-empty list
JiangJiaWei1103 c3f73b1
fix: Aggregate all event files not just the first file obj
JiangJiaWei1103 302d903
fix: Avoid redundant appends
JiangJiaWei1103 6237c07
list job
Future-Outlier 5c8fc59
fix: Explicitly close content body to avoid resource leaks
JiangJiaWei1103 10237ec
docs: Remove redundant notes
JiangJiaWei1103 0e28fa2
fix: Close content on failure to prevent rsc leak
JiangJiaWei1103 48681ab
docs: Update helper usage
JiangJiaWei1103 5c5c878
Merge branch 'my-master' into epic-4274/e2e-test-coverage-of-event-types
JiangJiaWei1103 e640798
test: Test log file existence only
JiangJiaWei1103 706199c
style: Remove trailing slash
JiangJiaWei1103 6f47105
docs: State why we use sleep
JiangJiaWei1103 3a2bc7a
Merge branch 'epic-4274/e2e-test-coverage-of-event-types' into epic-4…
JiangJiaWei1103 4204a5c
Merge branch 'my-master' into epic-4274/e2e-test-head-worker-logs
JiangJiaWei1103 4529c9f
refactor: Wrap event type coverage assertion in eventually
JiangJiaWei1103 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,6 +2,7 @@ package e2e | |
|
|
||
| import ( | ||
| "context" | ||
| "encoding/json" | ||
| "fmt" | ||
| "os/exec" | ||
| "path/filepath" | ||
|
|
@@ -18,8 +19,6 @@ import ( | |
| "github.com/aws/aws-sdk-go/aws/session" | ||
| "github.com/aws/aws-sdk-go/service/s3" | ||
| rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" | ||
| "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" | ||
| rayv1ac "github.com/ray-project/kuberay/ray-operator/pkg/client/applyconfiguration/ray/v1" | ||
| . "github.com/ray-project/kuberay/ray-operator/test/support" | ||
| ) | ||
|
|
||
|
|
@@ -35,8 +34,33 @@ const ( | |
| // Ray cluster | ||
| rayClusterManifestPath = "../../config/raycluster.yaml" | ||
| rayClusterID = "default" | ||
|
|
||
| // Ray job | ||
| rayJobManifestPath = "../../config/rayjob.yaml" | ||
| ) | ||
|
|
||
| // rayEventTypes includes all potential event types defined in Ray: | ||
| // https://github.com/ray-project/ray/blob/3b41c97fa90c58b0b72c0026f57005b92310160d/src/ray/protobuf/public/events_base_event.proto#L49-L61 | ||
| var rayEventTypes = []string{ | ||
| "ACTOR_DEFINITION_EVENT", | ||
| "ACTOR_LIFECYCLE_EVENT", | ||
| "ACTOR_TASK_DEFINITION_EVENT", | ||
| "DRIVER_JOB_DEFINITION_EVENT", | ||
| "DRIVER_JOB_LIFECYCLE_EVENT", | ||
| "TASK_DEFINITION_EVENT", | ||
| "TASK_LIFECYCLE_EVENT", | ||
| "TASK_PROFILE_EVENT", | ||
| "NODE_DEFINITION_EVENT", | ||
| "NODE_LIFECYCLE_EVENT", | ||
| } | ||
|
|
||
| // rayEvent contains specific fields in the Ray event JSON schema. For now, we keep only two fields, | ||
| // eventId and eventType while ensuring future extensibility (e.g., sessionName, timestamp, sourceType, etc.). | ||
| type rayEvent struct { | ||
| EventID string `json:"eventId"` | ||
| EventType string `json:"eventType"` | ||
| } | ||
|
|
||
| func TestCollector(t *testing.T) { | ||
| // Share a single S3 client among subtests. | ||
| s3Client := ensureS3Client(t) | ||
|
|
@@ -66,20 +90,25 @@ func TestCollector(t *testing.T) { | |
| } | ||
| } | ||
|
|
||
| // testCollectorUploadOnGracefulShutdown verifies that logs and node_events are successfully uploaded to S3 on cluster deletion. | ||
| // testCollectorUploadOnGracefulShutdown verifies that logs, node_events, and job_events are successfully uploaded to S3 on cluster deletion. | ||
| // | ||
| // The test case follows these steps: | ||
| // 1. Prepare test environment by applying a Ray cluster with the collector | ||
| // 2. Submit a Ray job to the existing Ray cluster | ||
| // 3. Get the sessionID and nodeID for further verification | ||
| // 4. Delete the Ray cluster to trigger log uploading and event flushing on deletion. When the Ray cluster is deleted, | ||
| // logs and node_events are processed as follows: | ||
| // logs, node_events, and job_events are processed as follows: | ||
| // - logs: Trigger RayLogHandler.processSessionLatestLog to process logs under /tmp/ray/session_latest | ||
| // - node_events: Trigger EventServer.flushEvents to process in-memory events | ||
| // - node_events: Trigger EventServer.flushEvents, which calls es.flushNodeEventsForHour to process in-memory node events | ||
| // - job_events: Trigger EventServer.flushEvents, which calls es.flushJobEventsForHour to process in-memory job events | ||
| // | ||
| // 5. Verify logs and node_events are successfully uploaded to S3. Expected S3 path structure: | ||
| // 5. Verify logs, node_events, and job_events are successfully uploaded to S3. Expected S3 path structure: | ||
| // - {s3BucketName}/log/{clusterName}_{clusterID}/{sessionID}/logs/... | ||
| // - {s3BucketName}/log/{clusterName}_{clusterID}/{sessionID}/node_events/... | ||
| // - {s3BucketName}/log/{clusterName}_{clusterID}/{sessionID}/job_events/AgAAAA==/... | ||
| // - {s3BucketName}/log/{clusterName}_{clusterID}/{sessionID}/job_events/AQAAAA==/... | ||
| // | ||
| // For detailed verification logic, please refer to verifyS3SessionDirs. | ||
| // | ||
| // 6. Delete S3 bucket to ensure test isolation | ||
| func testCollectorUploadOnGracefulShutdown(test Test, g *WithT, namespace *corev1.Namespace, s3Client *s3.S3) { | ||
|
|
@@ -88,9 +117,11 @@ func testCollectorUploadOnGracefulShutdown(test Test, g *WithT, namespace *corev | |
| // Submit a Ray job to the existing cluster. | ||
| _ = applyRayJobToCluster(test, g, namespace, rayCluster) | ||
|
|
||
| // Define variables for constructing S3 object prefix. | ||
| clusterNameID := fmt.Sprintf("%s_%s", rayCluster.Name, rayClusterID) | ||
| sessionID := getSessionIDFromHeadPod(test, g, rayCluster) | ||
| nodeID := getNodeIDFromHeadPod(test, g, rayCluster) | ||
| headNodeID := getNodeIDFromPod(test, g, HeadPod(test, rayCluster), "ray-head") | ||
| workerNodeID := getNodeIDFromPod(test, g, FirstWorkerPod(test, rayCluster), "ray-worker") | ||
| sessionPrefix := fmt.Sprintf("log/%s/%s/", clusterNameID, sessionID) | ||
|
|
||
| // Delete the Ray cluster to trigger log uploading and event flushing on deletion. | ||
|
|
@@ -103,8 +134,8 @@ func testCollectorUploadOnGracefulShutdown(test Test, g *WithT, namespace *corev | |
| return err | ||
| }, TestTimeoutMedium).Should(WithTransform(k8serrors.IsNotFound, BeTrue())) | ||
|
|
||
| // Verify logs and node_events are successfully uploaded to S3. | ||
| verifyS3SessionDirs(test, g, s3Client, sessionPrefix, nodeID) | ||
| // Verify logs, node_events, and job_events are successfully uploaded to S3. | ||
| verifyS3SessionDirs(test, g, s3Client, sessionPrefix, headNodeID, workerNodeID) | ||
|
|
||
| // Delete S3 bucket to ensure test isolation. | ||
| deleteS3Bucket(test, g, s3Client) | ||
|
|
@@ -136,7 +167,8 @@ func testCollectorSeparatesFilesBySession(test Test, g *WithT, namespace *corev1 | |
|
|
||
| clusterNameID := fmt.Sprintf("%s_%s", rayCluster.Name, rayClusterID) | ||
| sessionID := getSessionIDFromHeadPod(test, g, rayCluster) | ||
| nodeID := getNodeIDFromHeadPod(test, g, rayCluster) | ||
| headNodeID := getNodeIDFromPod(test, g, HeadPod(test, rayCluster), "ray-head") | ||
| workerNodeID := getNodeIDFromPod(test, g, FirstWorkerPod(test, rayCluster), "ray-worker") | ||
| sessionPrefix := fmt.Sprintf("log/%s/%s/", clusterNameID, sessionID) | ||
|
|
||
| // NOTE: We use `kill 1` to simulate Kubernetes OOMKilled behavior. | ||
|
|
@@ -145,23 +177,10 @@ func testCollectorSeparatesFilesBySession(test Test, g *WithT, namespace *corev1 | |
| // Since Kubernetes 1.28 (with cgroups v2 enabled), `memory.oom.group` is enabled by default: when any process in a cgroup | ||
| // hits the memory limit, all processes in the container are killed together, thereby triggering container restart. | ||
| // For more details, please refer to https://github.com/kubernetes/kubernetes/pull/117793 | ||
| LogWithTimestamp(test.T(), "Killing main process of ray-head container to trigger a restart") | ||
| g.Eventually(func(gg Gomega) { | ||
| headPod, err := GetHeadPod(test, rayCluster) | ||
| gg.Expect(err).NotTo(HaveOccurred()) | ||
| _, stderr := ExecPodCmd(test, headPod, "ray-head", []string{"kill", "1"}) | ||
| gg.Expect(stderr.String()).To(BeEmpty()) | ||
| }, TestTimeoutMedium).Should(Succeed(), "Failed to kill main process of ray-head container") | ||
|
|
||
| LogWithTimestamp(test.T(), "Waiting for ray-head container to restart and become ready") | ||
| g.Eventually(func(gg Gomega) { | ||
| updatedPod, err := GetHeadPod(test, rayCluster) | ||
| gg.Expect(err).NotTo(HaveOccurred()) | ||
| rayHeadStatus, err := getContainerStatusByName(updatedPod, "ray-head") | ||
| gg.Expect(err).NotTo(HaveOccurred()) | ||
| gg.Expect(rayHeadStatus.RestartCount).To(BeNumerically(">", 0)) | ||
| gg.Expect(rayHeadStatus.Ready).To(BeTrue()) | ||
| }, TestTimeoutShort).Should(Succeed(), "ray-head container should restart and become ready") | ||
| killContainerAndWaitForRestart(test, g, HeadPod(test, rayCluster), "ray-head") | ||
| // TODO(jwj): Clarify the automatic restart mechanism. | ||
| // Force kill the ray-worker container before automatic restart. | ||
| killContainerAndWaitForRestart(test, g, FirstWorkerPod(test, rayCluster), "ray-worker") | ||
|
|
||
| // Verify the old session logs have been processed on disk. | ||
| dirs := []string{"prev-logs", "persist-complete-logs"} | ||
|
|
@@ -178,8 +197,8 @@ func testCollectorSeparatesFilesBySession(test Test, g *WithT, namespace *corev1 | |
| }, TestTimeoutMedium).Should(Succeed(), "Session directory %s should exist in %s", sessionID, dirPath) | ||
| } | ||
|
|
||
| // Verify logs and node_events are successfully uploaded to S3. | ||
| verifyS3SessionDirs(test, g, s3Client, sessionPrefix, nodeID) | ||
| // Verify logs, node_events, and job_events are successfully uploaded to S3. | ||
| verifyS3SessionDirs(test, g, s3Client, sessionPrefix, headNodeID, workerNodeID) | ||
|
|
||
| deleteS3Bucket(test, g, s3Client) | ||
| } | ||
|
|
@@ -347,14 +366,12 @@ func applyRayCluster(test Test, g *WithT, namespace *corev1.Namespace) *rayv1.Ra | |
|
|
||
| // applyRayJobToCluster applies a Ray job to the existing Ray cluster. | ||
| func applyRayJobToCluster(test Test, g *WithT, namespace *corev1.Namespace, rayCluster *rayv1.RayCluster) *rayv1.RayJob { | ||
| jobScript := "import ray; ray.init(); print(ray.cluster_resources())" | ||
| rayJobAC := rayv1ac.RayJob("ray-job", namespace.Name). | ||
| WithSpec(rayv1ac.RayJobSpec(). | ||
| WithClusterSelector(map[string]string{utils.RayClusterLabelKey: rayCluster.Name}). | ||
| WithEntrypoint(fmt.Sprintf("python -c %q", jobScript)). | ||
| WithSubmitterPodTemplate(JobSubmitterPodTemplateApplyConfiguration())) | ||
|
|
||
| rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions) | ||
| rayJobFromYaml := DeserializeRayJobYAML(test, rayJobManifestPath) | ||
| rayJobFromYaml.Namespace = namespace.Name | ||
|
|
||
| rayJob, err := test.Client().Ray().RayV1(). | ||
| RayJobs(namespace.Name). | ||
| Create(test.Ctx(), rayJobFromYaml, metav1.CreateOptions{}) | ||
| g.Expect(err).NotTo(HaveOccurred()) | ||
| LogWithTimestamp(test.T(), "Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name) | ||
|
|
||
|
|
@@ -369,51 +386,40 @@ func applyRayJobToCluster(test Test, g *WithT, namespace *corev1.Namespace, rayC | |
| return rayJob | ||
| } | ||
|
|
||
| // verifyS3SessionDirs verifies that directories logs/ and node_events/ exist under a session prefix in S3. | ||
| // This helper function checks that each directory contains at least one object. | ||
| // Additionally, it verifies that specific files have content: | ||
| // - logs/<nodeID>/raylet.out must exist and have content > 0 bytes | ||
| // - node_events/<nodeID>_<suffix> must exist and have content > 0 bytes (suffix can be ignored for verification) | ||
| func verifyS3SessionDirs(test Test, g *WithT, s3Client *s3.S3, sessionPrefix string, nodeID string) { | ||
| dirs := []string{"logs", "node_events"} | ||
| for _, dir := range dirs { | ||
| dirPrefix := sessionPrefix + dir + "/" | ||
| // verifyS3SessionDirs verifies file contents in logs/, node_events/, and job_events/ directories under a session prefix in S3. | ||
| // There are two phases of verification: | ||
| // 1. Verify file contents in logs/ directory | ||
| // - For the head node, verify raylet.out and gcs_server.out exist and have content > 0 bytes | ||
| // - For the worker node, verify raylet.out exists and have content > 0 bytes | ||
| // | ||
| // 2. Verify event type coverage in node_events/ and job_events/ directories | ||
| // - Aggregate all events from node_events/ and job_events/ directories | ||
| // - Verify that all potential event types are present in the aggregated events | ||
| // | ||
| // NOTE: Since flushed node and job events are nondeterministic, we need to aggregate them first before verifying event type coverage. | ||
| func verifyS3SessionDirs(test Test, g *WithT, s3Client *s3.S3, sessionPrefix string, headNodeID string, workerNodeID string) { | ||
| // Verify file contents in logs/ directory. | ||
| headLogDirPrefix := fmt.Sprintf("%slogs/%s/", sessionPrefix, headNodeID) | ||
| workerLogDirPrefix := fmt.Sprintf("%slogs/%s/", sessionPrefix, workerNodeID) | ||
|
|
||
| LogWithTimestamp(test.T(), "Verifying raylet.out and gcs_server.out exist in head log directory %s", headLogDirPrefix) | ||
| for _, fileName := range []string{"raylet.out", "gcs_server.out"} { | ||
| assertNonEmptyFileExist(test, g, s3Client, headLogDirPrefix, fileName) | ||
| } | ||
|
|
||
| g.Eventually(func(gg Gomega) { | ||
| // Verify the directory has at least one object. | ||
| objects, err := s3Client.ListObjectsV2(&s3.ListObjectsV2Input{ | ||
| Bucket: aws.String(s3BucketName), | ||
| Prefix: aws.String(dirPrefix), | ||
| MaxKeys: aws.Int64(10), | ||
| }) | ||
| gg.Expect(err).NotTo(HaveOccurred()) | ||
| keyCount := aws.Int64Value(objects.KeyCount) | ||
| gg.Expect(keyCount).To(BeNumerically(">", 0)) | ||
| LogWithTimestamp(test.T(), "Verified directory %s under %s has at least one object", dir, sessionPrefix) | ||
|
|
||
| // Find the first file object for content verification. | ||
| var fileObj *s3.Object | ||
| for _, obj := range objects.Contents { | ||
| if !strings.HasSuffix(aws.StringValue(obj.Key), "/") { | ||
| fileObj = obj | ||
| break | ||
| } | ||
| } | ||
| gg.Expect(fileObj).NotTo(BeNil(), "No file object found in directory %s", dirPrefix) | ||
|
|
||
| // Verify the file has content by checking file size. | ||
| fileKey := *fileObj.Key | ||
| LogWithTimestamp(test.T(), "Checking file: %s", fileKey) | ||
| obj, err := s3Client.HeadObject(&s3.HeadObjectInput{ | ||
| Bucket: aws.String(s3BucketName), | ||
| Key: aws.String(fileKey), | ||
| }) | ||
| gg.Expect(err).NotTo(HaveOccurred()) | ||
| fileSize := aws.Int64Value(obj.ContentLength) | ||
| gg.Expect(fileSize).To(BeNumerically(">", 0)) | ||
| LogWithTimestamp(test.T(), "Verified file %s has content: %d bytes", fileKey, fileSize) | ||
| }, TestTimeoutMedium).Should(Succeed(), "Failed to verify at least one object in directory %s has content", dirPrefix) | ||
| LogWithTimestamp(test.T(), "Verifying raylet.out exists in worker log directory %s", workerLogDirPrefix) | ||
| assertNonEmptyFileExist(test, g, s3Client, workerLogDirPrefix, "raylet.out") | ||
|
|
||
| // Verify event type coverage in node_events/ and job_events/ directories. | ||
| LogWithTimestamp(test.T(), "Verifying all %d event types are covered: %v", len(rayEventTypes), rayEventTypes) | ||
| uploadedEvents := []rayEvent{} | ||
| for _, dir := range []string{"node_events", "job_events/AgAAAA==", "job_events/AQAAAA=="} { | ||
| dirPrefix := sessionPrefix + dir + "/" | ||
| events, err := loadRayEventsFromS3(s3Client, s3BucketName, dirPrefix) | ||
| g.Expect(err).NotTo(HaveOccurred()) | ||
| uploadedEvents = append(uploadedEvents, events...) | ||
| } | ||
| assertAllEventTypesCovered(test, g, uploadedEvents) | ||
cursor[bot] marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| // getSessionIDFromHeadPod retrieves the sessionID from the Ray head pod by reading the symlink | ||
|
|
@@ -439,9 +445,9 @@ fi` | |
| return sessionID | ||
| } | ||
|
|
||
| // getNodeIDFromHeadPod retrieves the nodeID from the Ray head pod by reading /tmp/ray/raylet_node_id. | ||
| func getNodeIDFromHeadPod(test Test, g *WithT, rayCluster *rayv1.RayCluster) string { | ||
| headPod, err := GetHeadPod(test, rayCluster) | ||
| // getNodeIDFromPod retrieves the nodeID from the Ray head or worker pod by reading /tmp/ray/raylet_node_id. | ||
| func getNodeIDFromPod(test Test, g *WithT, getPod func() (*corev1.Pod, error), containerName string) string { | ||
| pod, err := getPod() | ||
| g.Expect(err).NotTo(HaveOccurred()) | ||
|
|
||
| getNodeIDCmd := `if [ -f "/tmp/ray/raylet_node_id" ]; then | ||
|
|
@@ -450,7 +456,7 @@ else | |
| echo "raylet_node_id not found" | ||
| exit 1 | ||
| fi` | ||
| output, _ := ExecPodCmd(test, headPod, "ray-head", []string{"sh", "-c", getNodeIDCmd}) | ||
| output, _ := ExecPodCmd(test, pod, containerName, []string{"sh", "-c", getNodeIDCmd}) | ||
|
|
||
| // Parse output to extract the nodeID. | ||
| nodeID := strings.TrimSpace(output.String()) | ||
|
|
@@ -461,6 +467,42 @@ fi` | |
|
|
||
| } | ||
|
|
||
| // FirstWorkerPod returns a function that gets the first worker pod from the Ray cluster. | ||
| // It adapts the WorkerPods function to be used with functions expecting a single pod. | ||
| func FirstWorkerPod(test Test, rayCluster *rayv1.RayCluster) func() (*corev1.Pod, error) { | ||
| return func() (*corev1.Pod, error) { | ||
| workerPods, err := GetWorkerPods(test, rayCluster) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| if len(workerPods) == 0 { | ||
| return nil, fmt.Errorf("no worker pods found") | ||
| } | ||
| return &workerPods[0], nil | ||
| } | ||
| } | ||
|
|
||
| // killContainerAndWaitForRestart kills the main process of a container and waits for the container to restart and become ready. | ||
| func killContainerAndWaitForRestart(test Test, g *WithT, getPod func() (*corev1.Pod, error), containerName string) { | ||
| LogWithTimestamp(test.T(), "Killing main process of %s container to trigger a restart", containerName) | ||
| g.Eventually(func(gg Gomega) { | ||
| pod, err := getPod() | ||
| gg.Expect(err).NotTo(HaveOccurred()) | ||
| _, stderr := ExecPodCmd(test, pod, containerName, []string{"kill", "1"}) | ||
| gg.Expect(stderr.String()).To(BeEmpty()) | ||
| }, TestTimeoutMedium).Should(Succeed(), "Failed to kill main process of %s container", containerName) | ||
|
|
||
| LogWithTimestamp(test.T(), "Waiting for %s container to restart and become ready", containerName) | ||
| g.Eventually(func(gg Gomega) { | ||
| pod, err := getPod() | ||
| gg.Expect(err).NotTo(HaveOccurred()) | ||
| containerStatus, err := getContainerStatusByName(pod, containerName) | ||
| gg.Expect(err).NotTo(HaveOccurred()) | ||
| gg.Expect(containerStatus.RestartCount).To(BeNumerically(">", 0)) | ||
| gg.Expect(containerStatus.Ready).To(BeTrue()) | ||
| }, TestTimeoutShort).Should(Succeed(), "%s container should restart and become ready", containerName) | ||
| } | ||
|
|
||
| // getContainerStatusByName retrieves the container status by container name. | ||
| // NOTE: ContainerStatuses order doesn't guarantee to match Spec.Containers order. | ||
| // For more details, please refer to the following link: | ||
|
|
@@ -473,3 +515,74 @@ func getContainerStatusByName(pod *corev1.Pod, containerName string) (*corev1.Co | |
| } | ||
| return nil, fmt.Errorf("container %s not found in pod %s/%s", containerName, pod.Namespace, pod.Name) | ||
| } | ||
|
|
||
| // loadRayEventsFromS3 loads Ray events from S3. | ||
| func loadRayEventsFromS3(s3Client *s3.S3, bucket string, prefix string) ([]rayEvent, error) { | ||
| objects, err := s3Client.ListObjectsV2(&s3.ListObjectsV2Input{ | ||
| Bucket: aws.String(bucket), | ||
| Prefix: aws.String(prefix), | ||
| }) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| // Find the first file object for loading Ray events. | ||
| var fileKey string | ||
| for _, obj := range objects.Contents { | ||
| if key := aws.StringValue(obj.Key); !strings.HasSuffix(key, "/") { | ||
| fileKey = key | ||
| break | ||
| } | ||
JiangJiaWei1103 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| if fileKey == "" { | ||
| return nil, fmt.Errorf("no file object found in directory %s", prefix) | ||
| } | ||
|
|
||
| // Get and decode the file object content into Ray events. | ||
| content, err := s3Client.GetObject(&s3.GetObjectInput{ | ||
| Bucket: aws.String(bucket), | ||
| Key: aws.String(fileKey), | ||
| }) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| defer content.Body.Close() | ||
|
|
||
| var events []rayEvent | ||
| if err := json.NewDecoder(content.Body).Decode(&events); err != nil { | ||
| return nil, err | ||
| } | ||
| return events, nil | ||
| } | ||
|
|
||
| // assertNonEmptyFileExist verifies that a file exists and has content (> 0 bytes). | ||
| // For a Ray cluster with one head node and one worker node, there are two log directories to verify: | ||
| // - logs/<headNodeID>/ | ||
| // - logs/<workerNodeID>/ | ||
| func assertNonEmptyFileExist(test Test, g *WithT, s3Client *s3.S3, nodeLogDirPrefix string, fileName string) { | ||
|
||
| fileKey := fmt.Sprintf("%s/%s", nodeLogDirPrefix, fileName) | ||
JiangJiaWei1103 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| LogWithTimestamp(test.T(), "Verifying file %s has content (> 0 bytes)", fileKey) | ||
| g.Eventually(func(gg Gomega) { | ||
| // Verify the file has content by checking file size. | ||
| obj, err := s3Client.HeadObject(&s3.HeadObjectInput{ | ||
| Bucket: aws.String(s3BucketName), | ||
| Key: aws.String(fileKey), | ||
| }) | ||
| gg.Expect(err).NotTo(HaveOccurred()) | ||
| fileSize := aws.Int64Value(obj.ContentLength) | ||
| gg.Expect(fileSize).To(BeNumerically(">", 0)) | ||
| LogWithTimestamp(test.T(), "Verified file %s has content: %d bytes", fileKey, fileSize) | ||
| }, TestTimeoutMedium).Should(Succeed(), "Failed to verify file %s has content (> 0 bytes)", fileKey) | ||
| } | ||
JiangJiaWei1103 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| // assertAllEventTypesCovered verifies that all potential event types are present in the events uploaded to S3. | ||
| func assertAllEventTypesCovered(test Test, g *WithT, events []rayEvent) { | ||
| foundEventTypes := map[string]bool{} | ||
| for _, event := range events { | ||
| foundEventTypes[event.EventType] = true | ||
| } | ||
|
|
||
| for _, eventType := range rayEventTypes { | ||
| g.Expect(foundEventTypes[eventType]).To(BeTrue(), "Event type %s not found", eventType) | ||
| } | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.