Skip to content
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
b0172bc
[historyserver] Fix getJobID for job event collection
Future-Outlier Jan 5, 2026
eeb7a7f
add jia-wei as co-author, since he debug with me together
Future-Outlier Jan 5, 2026
0ff5bcf
remove unused code
Future-Outlier Jan 5, 2026
5aea305
update rueian's advice
Future-Outlier Jan 5, 2026
5f547a2
add task profile event example
Future-Outlier Jan 5, 2026
0177160
revert back oneof solution
Future-Outlier Jan 5, 2026
420ce9a
add task profile event
Future-Outlier Jan 5, 2026
0621ba4
test: Test event type coverage in happy path
JiangJiaWei1103 Jan 5, 2026
07403f7
Merge branch 'my-master' into epic-4274/e2e-test-coverage-of-event-types
JiangJiaWei1103 Jan 6, 2026
de153d1
refactor: Remove redundant code
JiangJiaWei1103 Jan 6, 2026
d1c2b18
test: Verify event type coverage of aggregated node and job events
JiangJiaWei1103 Jan 6, 2026
14a9b52
test: Force kill worker container and verify event coverage
JiangJiaWei1103 Jan 6, 2026
598dbfd
refactor: Create an WorkerPods adapter and remove redundancy
JiangJiaWei1103 Jan 7, 2026
9611a42
refactor: Use eventually to wrap coverage check
JiangJiaWei1103 Jan 9, 2026
9f70a21
refactor: List subdirs of job_events rather than hardcoding
JiangJiaWei1103 Jan 13, 2026
84519bc
fix: Wait for async job events flushing on worker
JiangJiaWei1103 Jan 13, 2026
c796bfc
test: Consolidate tests by checking non-empty list
JiangJiaWei1103 Jan 13, 2026
c3f73b1
fix: Aggregate all event files not just the first file obj
JiangJiaWei1103 Jan 13, 2026
302d903
fix: Avoid redundant appends
JiangJiaWei1103 Jan 13, 2026
5c8fc59
fix: Explicitly close content body to avoid resource leaks
JiangJiaWei1103 Jan 14, 2026
10237ec
docs: Remove redundant notes
JiangJiaWei1103 Jan 14, 2026
0e28fa2
fix: Close content on failure to prevent rsc leak
JiangJiaWei1103 Jan 14, 2026
48681ab
docs: Update helper usage
JiangJiaWei1103 Jan 14, 2026
5c5c878
Merge branch 'my-master' into epic-4274/e2e-test-coverage-of-event-types
JiangJiaWei1103 Jan 14, 2026
6f47105
docs: State why we use sleep
JiangJiaWei1103 Jan 15, 2026
926aee4
Merge branch 'my-master' into epic-4274/e2e-test-coverage-of-event-types
JiangJiaWei1103 Jan 17, 2026
d94d02a
refactor: Define AllEventTypes for reusability and maintainability
JiangJiaWei1103 Jan 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions historyserver/config/rayjob.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ spec:
entrypoint: |
python -c "
import ray
import time
ray.init()

@ray.remote
Expand Down Expand Up @@ -35,6 +36,8 @@ spec:
final_count = ray.get(counter.get_count.remote())
print(f'Final count: {final_count}')
print(f'Cluster resources: {ray.cluster_resources()}')

time.sleep(5)
Comment on lines 39 to 42
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you help me try ray.shutdown(), maybe it will work

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, it doesn't work either:

Screenshot 2026-01-15 at 7 59 52 PM

Maybe the error is on the Ray side, let's keep tracking this issue!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created an issue here
ray-project/ray#60218

"
# Select the existing Ray cluster running the collector.
clusterSelector:
Expand Down
219 changes: 180 additions & 39 deletions historyserver/test/e2e/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package e2e

import (
"context"
"encoding/json"
"fmt"
"os/exec"
"path/filepath"
Expand All @@ -18,8 +19,6 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
rayv1ac "github.com/ray-project/kuberay/ray-operator/pkg/client/applyconfiguration/ray/v1"
. "github.com/ray-project/kuberay/ray-operator/test/support"
)

Expand All @@ -35,8 +34,33 @@ const (
// Ray cluster
rayClusterManifestPath = "../../config/raycluster.yaml"
rayClusterID = "default"

// Ray job
rayJobManifestPath = "../../config/rayjob.yaml"
)

// rayEventTypes includes all potential event types defined in Ray:
// https://github.com/ray-project/ray/blob/3b41c97fa90c58b0b72c0026f57005b92310160d/src/ray/protobuf/public/events_base_event.proto#L49-L61
var rayEventTypes = []string{
"ACTOR_DEFINITION_EVENT",
"ACTOR_LIFECYCLE_EVENT",
"ACTOR_TASK_DEFINITION_EVENT",
"DRIVER_JOB_DEFINITION_EVENT",
"DRIVER_JOB_LIFECYCLE_EVENT",
"TASK_DEFINITION_EVENT",
"TASK_LIFECYCLE_EVENT",
"TASK_PROFILE_EVENT",
"NODE_DEFINITION_EVENT",
"NODE_LIFECYCLE_EVENT",
}
Copy link
Member

@win5923 win5923 Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we define in the event.go? Similar to how AllJobStatuses is defined here:

const (
JobStatusNew JobStatus = ""
JobStatusPending JobStatus = "PENDING"
JobStatusRunning JobStatus = "RUNNING"
JobStatusStopped JobStatus = "STOPPED"
JobStatusSucceeded JobStatus = "SUCCEEDED"
JobStatusFailed JobStatus = "FAILED"
)
var AllJobStatuses = []JobStatus{
JobStatusNew,
JobStatusPending,
JobStatusRunning,
JobStatusStopped,
JobStatusSucceeded,
JobStatusFailed,
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @win5923, thanks for reviewing! Fixed at d94d02a.


// rayEvent contains specific fields in the Ray event JSON schema. For now, we keep only two fields,
// eventId and eventType while ensuring future extensibility (e.g., sessionName, timestamp, sourceType, etc.).
type rayEvent struct {
EventID string `json:"eventId"`
EventType string `json:"eventType"`
}

Comment on lines +45 to +49
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We define this custom type (with specific fields) to represent the decoded Ray event for future extensibility. For example, we might want to do more fine-grained verification after the history server becomes stable.

func TestCollector(t *testing.T) {
// Share a single S3 client among subtests.
s3Client := ensureS3Client(t)
Expand Down Expand Up @@ -66,20 +90,25 @@ func TestCollector(t *testing.T) {
}
}

// testCollectorUploadOnGracefulShutdown verifies that logs and node_events are successfully uploaded to S3 on cluster deletion.
// testCollectorUploadOnGracefulShutdown verifies that logs, node_events, and job_events are successfully uploaded to S3 on cluster deletion.
//
// The test case follows these steps:
// 1. Prepare test environment by applying a Ray cluster with the collector
// 2. Submit a Ray job to the existing Ray cluster
// 3. Get the sessionID and nodeID for further verification
// 4. Delete the Ray cluster to trigger log uploading and event flushing on deletion. When the Ray cluster is deleted,
// logs and node_events are processed as follows:
// logs, node_events, and job_events are processed as follows:
// - logs: Trigger RayLogHandler.processSessionLatestLog to process logs under /tmp/ray/session_latest
// - node_events: Trigger EventServer.flushEvents to process in-memory events
// - node_events: Trigger EventServer.flushEvents, which calls es.flushNodeEventsForHour to process in-memory node events
// - job_events: Trigger EventServer.flushEvents, which calls es.flushJobEventsForHour to process in-memory job events
//
// 5. Verify logs and node_events are successfully uploaded to S3. Expected S3 path structure:
// 5. Verify logs, node_events, and job_events are successfully uploaded to S3. Expected S3 path structure:
// - {s3BucketName}/log/{clusterName}_{clusterID}/{sessionID}/logs/...
// - {s3BucketName}/log/{clusterName}_{clusterID}/{sessionID}/node_events/...
// - {s3BucketName}/log/{clusterName}_{clusterID}/{sessionID}/job_events/AgAAAA==/...
// - {s3BucketName}/log/{clusterName}_{clusterID}/{sessionID}/job_events/AQAAAA==/...
//
// For detailed verification logic, please refer to verifyS3SessionDirs.
//
// 6. Delete S3 bucket to ensure test isolation
func testCollectorUploadOnGracefulShutdown(test Test, g *WithT, namespace *corev1.Namespace, s3Client *s3.S3) {
Expand All @@ -103,7 +132,7 @@ func testCollectorUploadOnGracefulShutdown(test Test, g *WithT, namespace *corev
return err
}, TestTimeoutMedium).Should(WithTransform(k8serrors.IsNotFound, BeTrue()))

// Verify logs and node_events are successfully uploaded to S3.
// Verify logs, node_events, and job_events are successfully uploaded to S3.
verifyS3SessionDirs(test, g, s3Client, sessionPrefix, nodeID)

// Delete S3 bucket to ensure test isolation.
Expand Down Expand Up @@ -145,23 +174,8 @@ func testCollectorSeparatesFilesBySession(test Test, g *WithT, namespace *corev1
// Since Kubernetes 1.28 (with cgroups v2 enabled), `memory.oom.group` is enabled by default: when any process in a cgroup
// hits the memory limit, all processes in the container are killed together, thereby triggering container restart.
// For more details, please refer to https://github.com/kubernetes/kubernetes/pull/117793
LogWithTimestamp(test.T(), "Killing main process of ray-head container to trigger a restart")
g.Eventually(func(gg Gomega) {
headPod, err := GetHeadPod(test, rayCluster)
gg.Expect(err).NotTo(HaveOccurred())
_, stderr := ExecPodCmd(test, headPod, "ray-head", []string{"kill", "1"})
gg.Expect(stderr.String()).To(BeEmpty())
}, TestTimeoutMedium).Should(Succeed(), "Failed to kill main process of ray-head container")

LogWithTimestamp(test.T(), "Waiting for ray-head container to restart and become ready")
g.Eventually(func(gg Gomega) {
updatedPod, err := GetHeadPod(test, rayCluster)
gg.Expect(err).NotTo(HaveOccurred())
rayHeadStatus, err := getContainerStatusByName(updatedPod, "ray-head")
gg.Expect(err).NotTo(HaveOccurred())
gg.Expect(rayHeadStatus.RestartCount).To(BeNumerically(">", 0))
gg.Expect(rayHeadStatus.Ready).To(BeTrue())
}, TestTimeoutShort).Should(Succeed(), "ray-head container should restart and become ready")
killContainerAndWaitForRestart(test, g, HeadPod(test, rayCluster), "ray-head")
killContainerAndWaitForRestart(test, g, FirstWorkerPod(test, rayCluster), "ray-worker")

// Verify the old session logs have been processed on disk.
dirs := []string{"prev-logs", "persist-complete-logs"}
Expand All @@ -178,7 +192,7 @@ func testCollectorSeparatesFilesBySession(test Test, g *WithT, namespace *corev1
}, TestTimeoutMedium).Should(Succeed(), "Session directory %s should exist in %s", sessionID, dirPath)
}

// Verify logs and node_events are successfully uploaded to S3.
// Verify logs, node_events, and job_events are successfully uploaded to S3.
verifyS3SessionDirs(test, g, s3Client, sessionPrefix, nodeID)

deleteS3Bucket(test, g, s3Client)
Expand Down Expand Up @@ -347,14 +361,12 @@ func applyRayCluster(test Test, g *WithT, namespace *corev1.Namespace) *rayv1.Ra

// applyRayJobToCluster applies a Ray job to the existing Ray cluster.
func applyRayJobToCluster(test Test, g *WithT, namespace *corev1.Namespace, rayCluster *rayv1.RayCluster) *rayv1.RayJob {
jobScript := "import ray; ray.init(); print(ray.cluster_resources())"
rayJobAC := rayv1ac.RayJob("ray-job", namespace.Name).
WithSpec(rayv1ac.RayJobSpec().
WithClusterSelector(map[string]string{utils.RayClusterLabelKey: rayCluster.Name}).
WithEntrypoint(fmt.Sprintf("python -c %q", jobScript)).
WithSubmitterPodTemplate(JobSubmitterPodTemplateApplyConfiguration()))

rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions)
rayJobFromYaml := DeserializeRayJobYAML(test, rayJobManifestPath)
rayJobFromYaml.Namespace = namespace.Name

rayJob, err := test.Client().Ray().RayV1().
RayJobs(namespace.Name).
Create(test.Ctx(), rayJobFromYaml, metav1.CreateOptions{})
g.Expect(err).NotTo(HaveOccurred())
LogWithTimestamp(test.T(), "Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name)

Expand All @@ -369,13 +381,20 @@ func applyRayJobToCluster(test Test, g *WithT, namespace *corev1.Namespace, rayC
return rayJob
}

// verifyS3SessionDirs verifies that directories logs/ and node_events/ exist under a session prefix in S3.
// This helper function checks that each directory contains at least one object.
// Additionally, it verifies that specific files have content:
// - logs/<nodeID>/raylet.out must exist and have content > 0 bytes
// - node_events/<nodeID>_<suffix> must exist and have content > 0 bytes (suffix can be ignored for verification)
// verifyS3SessionDirs verifies file contents in logs/, node_events/, and job_events/ directories under a session prefix in S3.
// There are two phases of verification:
// 1. Verify file contents in logs/ directory
// - logs/<nodeID>/raylet.out must exist and have content > 0 bytes
// - TODO(jwj): Complete docs.
//
// 2. Verify event type coverage in node_events/ and job_events/ directories
// - Aggregate all events from node_events/ and job_events/ directories
// - Verify that all potential event types are present in the aggregated events
//
// NOTE: Since flushed node and job events are nondeterministic, we need to aggregate them first before verifying event type coverage.
func verifyS3SessionDirs(test Test, g *WithT, s3Client *s3.S3, sessionPrefix string, nodeID string) {
dirs := []string{"logs", "node_events"}
// TODO(jwj): Separate verification for logs and events.
dirs := []string{"logs"}
Copy link
Member

@win5923 win5923 Jan 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to parse the --ray-root-dir argument? If not, it would be helpful to add a comment explain.

- --ray-root-dir=log

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @win5923,

Thanks for the review! We have another PR that focuses on log file verification: #4351. I’ll make the clarification there instead.

for _, dir := range dirs {
dirPrefix := sessionPrefix + dir + "/"

Expand Down Expand Up @@ -414,6 +433,36 @@ func verifyS3SessionDirs(test Test, g *WithT, s3Client *s3.S3, sessionPrefix str
LogWithTimestamp(test.T(), "Verified file %s has content: %d bytes", fileKey, fileSize)
}, TestTimeoutMedium).Should(Succeed(), "Failed to verify at least one object in directory %s has content", dirPrefix)
}

LogWithTimestamp(test.T(), "Verifying all %d event types are covered: %v", len(rayEventTypes), rayEventTypes)
nodeEventDirPrefix := fmt.Sprintf("%snode_events/", sessionPrefix)
jobEventDirPrefix := fmt.Sprintf("%sjob_events/", sessionPrefix)
g.Eventually(func(gg Gomega) {
// Enumerate all event directories to be aggregated for verification.
eventDirPrefixes := []string{nodeEventDirPrefix}
jobEventDirPrefixes, err := listS3SubdirPrefixes(s3Client, s3BucketName, jobEventDirPrefix)
gg.Expect(err).NotTo(HaveOccurred())
gg.Expect(jobEventDirPrefixes).NotTo(BeEmpty())
LogWithTimestamp(test.T(), "Found %d job event subdir prefixes: %v", len(jobEventDirPrefixes), jobEventDirPrefixes)
eventDirPrefixes = append(eventDirPrefixes, jobEventDirPrefixes...)

uploadedEvents := []rayEvent{}
for _, eventDirPrefix := range eventDirPrefixes {
events, err := loadRayEventsFromS3(s3Client, s3BucketName, eventDirPrefix)
gg.Expect(err).NotTo(HaveOccurred())
gg.Expect(events).NotTo(BeEmpty())
uploadedEvents = append(uploadedEvents, events...)
}

// Verify that all potential event types are present in the events uploaded to S3.
foundEventTypes := map[string]bool{}
for _, event := range uploadedEvents {
foundEventTypes[event.EventType] = true
}
for _, eventType := range rayEventTypes {
gg.Expect(foundEventTypes[eventType]).To(BeTrue(), "Event type %s not found", eventType)
}
}, TestTimeoutShort).Should(Succeed(), "Failed to verify all event types are covered")
}

// getSessionIDFromHeadPod retrieves the sessionID from the Ray head pod by reading the symlink
Expand Down Expand Up @@ -458,7 +507,42 @@ fi`
g.Expect(nodeID).NotTo(BeEmpty(), "nodeID should not be empty")

return nodeID
}

// FirstWorkerPod returns a function that gets the first worker pod from the Ray cluster.
// It adapts the WorkerPods function to be used with functions expecting a single pod.
func FirstWorkerPod(test Test, rayCluster *rayv1.RayCluster) func() (*corev1.Pod, error) {
return func() (*corev1.Pod, error) {
workerPods, err := GetWorkerPods(test, rayCluster)
if err != nil {
return nil, err
}
if len(workerPods) == 0 {
return nil, fmt.Errorf("no worker pods found")
}
return &workerPods[0], nil
}
}

// killContainerAndWaitForRestart kills the main process of a container and waits for the container to restart and become ready.
func killContainerAndWaitForRestart(test Test, g *WithT, getPod func() (*corev1.Pod, error), containerName string) {
LogWithTimestamp(test.T(), "Killing main process of %s container to trigger a restart", containerName)
g.Eventually(func(gg Gomega) {
pod, err := getPod()
gg.Expect(err).NotTo(HaveOccurred())
_, stderr := ExecPodCmd(test, pod, containerName, []string{"kill", "1"})
gg.Expect(stderr.String()).To(BeEmpty())
}, TestTimeoutMedium).Should(Succeed(), "Failed to kill main process of %s container", containerName)

LogWithTimestamp(test.T(), "Waiting for %s container to restart and become ready", containerName)
g.Eventually(func(gg Gomega) {
pod, err := getPod()
gg.Expect(err).NotTo(HaveOccurred())
containerStatus, err := getContainerStatusByName(pod, containerName)
gg.Expect(err).NotTo(HaveOccurred())
gg.Expect(containerStatus.RestartCount).To(BeNumerically(">", 0))
gg.Expect(containerStatus.Ready).To(BeTrue())
}, TestTimeoutShort).Should(Succeed(), "%s container should restart and become ready", containerName)
}

// getContainerStatusByName retrieves the container status by container name.
Expand All @@ -473,3 +557,60 @@ func getContainerStatusByName(pod *corev1.Pod, containerName string) (*corev1.Co
}
return nil, fmt.Errorf("container %s not found in pod %s/%s", containerName, pod.Namespace, pod.Name)
}

// listS3SubdirPrefixes lists all subdirectory prefixes in the S3 bucket under the given prefix.
func listS3SubdirPrefixes(s3Client *s3.S3, bucket string, prefix string) ([]string, error) {
objects, err := s3Client.ListObjectsV2(&s3.ListObjectsV2Input{
Bucket: aws.String(bucket),
Prefix: aws.String(prefix),
Delimiter: aws.String("/"),
})
if err != nil {
return nil, err
}

subdirPrefixes := []string{}
for _, subdirPrefix := range objects.CommonPrefixes {
subdirPrefixes = append(subdirPrefixes, *subdirPrefix.Prefix)
}
return subdirPrefixes, nil
}

// loadRayEventsFromS3 loads Ray events from S3.
func loadRayEventsFromS3(s3Client *s3.S3, bucket string, prefix string) ([]rayEvent, error) {
var events []rayEvent

// List all file objects in the directory.
objects, err := s3Client.ListObjectsV2(&s3.ListObjectsV2Input{
Bucket: aws.String(bucket),
Prefix: aws.String(prefix),
})
if err != nil {
return nil, err
}

for _, obj := range objects.Contents {
fileKey := aws.StringValue(obj.Key)
if strings.HasSuffix(fileKey, "/") {
continue
}

// Get the file object content and decode it into Ray events.
content, err := s3Client.GetObject(&s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(fileKey),
})
if err != nil {
return nil, err
}

var fileEvents []rayEvent
if err := json.NewDecoder(content.Body).Decode(&fileEvents); err != nil {
return nil, err
}
events = append(events, fileEvents...)
content.Body.Close()
}

return events, nil
}
Loading