Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions historyserver/docs/set_up_collector.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,26 @@ kubectl delete -f historyserver/config/raycluster.yaml
You're supposed to see the uploaded logs and events in the minio UI as below:

![write_logs_and_events](https://github.com/ray-project/kuberay/blob/db7cb864061518ed4cfa7bf48cf05cfbfeb49f95/historyserver/docs/assets/write_logs_and_events.png)

## Troubleshooting

### "too many open files" error

If you encounter `level=fatal msg="Create fsnotify NewWatcher error too many open files"` in the collector logs,
it is likely due to the inotify limits on the Kubernetes nodes.

To fix this, increase the limits on the **host nodes** (not inside the container):

```bash
# Apply changes immediately
sudo sysctl -w fs.inotify.max_user_instances=8192
sudo sysctl -w fs.inotify.max_user_watches=524288
```

To make these changes persistent across reboots, use the following lines:

```text
echo "fs.inotify.max_user_instances=8192" | sudo tee -a /etc/sysctl.conf
echo "fs.inotify.max_user_watches=524288" | sudo tee -a /etc/sysctl.conf
sudo sysctl -p
```
Comment on lines +144 to +166
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very cool, never heard this, I learned something

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: consider using filepath functions to handle file path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for review. There are some same issues in this file. Maybe we can file a new ticket to handle it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed to handle it as a followup

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @my-vegetable-has-exploded
do you mind create an issue so that I can assign to others?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no problem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,24 @@ import (
)

type RayLogHandler struct {
Writer storage.StorageWriter
LogFiles chan string
HttpClient *http.Client
ShutdownChan chan struct{}
logFilePaths map[string]bool
MetaDir string
RayClusterName string
LogDir string
RayNodeName string
RayClusterID string
RootDir string
SessionDir string
prevLogsDir string
PushInterval time.Duration
LogBatching int
filePathMu sync.Mutex
EnableMeta bool
Writer storage.StorageWriter
LogFiles chan string
HttpClient *http.Client
ShutdownChan chan struct{}
logFilePaths map[string]bool
MetaDir string
RayClusterName string
LogDir string
RayNodeName string
RayClusterID string
RootDir string
SessionDir string
prevLogsDir string
persistCompleteLogsDir string
PushInterval time.Duration
LogBatching int
filePathMu sync.Mutex
EnableMeta bool
}

func (r *RayLogHandler) Start(stop <-chan struct{}) error {
Expand All @@ -49,6 +50,7 @@ func (r *RayLogHandler) Start(stop <-chan struct{}) error {
func (r *RayLogHandler) Run(stop <-chan struct{}) error {
// watchPath := r.LogDir
r.prevLogsDir = "/tmp/ray/prev-logs"
r.persistCompleteLogsDir = "/tmp/ray/persist-complete-logs"
Comment on lines 52 to +53
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move these to constant in historyserver/pkg/utils/utils.go similar to KunWuLuan@9b2ca52?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @justinyeh1995,

Thanks for pointing this out. I’m currently working on it (handling all constants at once) and will open a PR soon. I’m not sure whether it should be included here or handled in a separate PR.

Copy link
Contributor

@justinyeh1995 justinyeh1995 Jan 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no problem. I wasn't aware of that when I wrote the comment. Is the pr related to any issue though? I think I need to catch up quite a bit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can refer to this issue. Thanks a lot!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for heads up!


// Initialize log file paths storage
r.logFilePaths = make(map[string]bool)
Expand All @@ -62,6 +64,11 @@ func (r *RayLogHandler) Run(stop <-chan struct{}) error {
// Setup signal handling for SIGTERM
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM)

// WatchPrevLogsLoops performs an initial scan of the prev-logs directory on startup
// to process leftover log files in prev-logs/{sessionID}/{nodeID}/logs/ directories.
// After scanning, it watches for new directories and files. This ensures incomplete
// uploads from previous runs are resumed.
go r.WatchPrevLogsLoops()
if r.EnableMeta {
go r.WatchSessionLatestLoops() // Watch session_latest symlink changes
Expand Down Expand Up @@ -231,7 +238,7 @@ func (r *RayLogHandler) WatchPrevLogsLoops() {
}

// Also check and create persist-complete-logs directory
completeLogsDir := "/tmp/ray/persist-complete-logs"
completeLogsDir := r.persistCompleteLogsDir
if _, err := os.Stat(completeLogsDir); os.IsNotExist(err) {
logrus.Infof("persist-complete-logs directory does not exist, creating it: %s", completeLogsDir)
if err := os.MkdirAll(completeLogsDir, 0o777); err != nil {
Expand Down Expand Up @@ -492,6 +499,38 @@ func (r *RayLogHandler) processSessionPrevLogs(sessionDir string) {
}
}

// isFileAlreadyPersisted checks if a log file has already been uploaded to storage and moved to
// the persist-complete-logs directory. This prevents duplicate uploads during collector restarts.
//
// When a log file is successfully uploaded, it is moved from prev-logs to persist-complete-logs
// to mark it as processed. This function checks if the equivalent file path exists in the
// persist-complete-logs directory.
//
// Example:
//
// Given absoluteLogPath = "/tmp/ray/prev-logs/session_123/node_456/logs/raylet.out"
// This function checks if "/tmp/ray/persist-complete-logs/session_123/node_456/logs/raylet.out" exists
// - If exists: returns true (file was already uploaded, skip it)
// - If not exists: returns false (file needs to be uploaded)
func (r *RayLogHandler) isFileAlreadyPersisted(absoluteLogPath, sessionID, nodeID string) bool {
// Calculate the relative path within the logs directory
logsDir := filepath.Join(r.prevLogsDir, sessionID, nodeID, "logs")
relativeLogPath, err := filepath.Rel(logsDir, absoluteLogPath)
if err != nil {
logrus.Errorf("Failed to get relative path for %s: %v", absoluteLogPath, err)
return false
}

// Construct the path in persist-complete-logs
persistedPath := filepath.Join(r.persistCompleteLogsDir, sessionID, nodeID, "logs", relativeLogPath)

// Check if the file exists
if _, err := os.Stat(persistedPath); err == nil {
return true
}
return false
}

// processPrevLogsDir processes logs in a /tmp/ray/prev-logs/{sessionid}/{nodeid} directory
func (r *RayLogHandler) processPrevLogsDir(sessionNodeDir string) {
// Extract session ID and node ID from the path
Expand All @@ -513,13 +552,6 @@ func (r *RayLogHandler) processPrevLogsDir(sessionNodeDir string) {
return
}

// Check if this directory has already been processed by checking in persist-complete-logs
Copy link
Contributor

@JiangJiaWei1103 JiangJiaWei1103 Jan 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No changes are needed here. This note is just to clarify that we still need to check for leftover log files since the presence of log directories under persist-complete-logs directory alone does not guarantee that all logs have been fully persisted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review! If the Collector crashes halfway through a node directory, persist-complete-logs already exists but some logs remain in prev-logs. Keeping this check would cause the Collector to skip the entire directory upon restart, leading to data loss. The current file-level check isFileAlreadyPersisted will handles this "partial success" scenario correctly without duplicate uploads.

Copy link
Contributor Author

@my-vegetable-has-exploded my-vegetable-has-exploded Jan 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replacing it with following codes then you can trigger this case using cd historyserver && go test -v ./pkg/collector/logcollector/runtime/logcollector/ -run TestScanAndProcess.@JiangJiaWei1103

	completeDir := filepath.Join(r.persistCompleteLogsDir, sessionID, nodeID, "logs")
	if _, err := os.Stat(completeDir); err == nil {
		logrus.Infof("Session %s node %s logs already processed, skipping", sessionID, nodeID)
		return
	}

And e2e test also covers this case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, thanks for the detailed explanation! I originally just wanted to leave a brief note for maintainers to get why we removed the code snippet here. All tests pass in my local env. Thanks!

completeDir := filepath.Join("/tmp/ray/persist-complete-logs", sessionID, nodeID, "logs")
if _, err := os.Stat(completeDir); err == nil {
logrus.Infof("Session %s node %s logs already processed, skipping", sessionID, nodeID)
return
}

logrus.Infof("Processing prev-logs for session: %s, node: %s", sessionID, nodeID)

logsDir := filepath.Join(sessionNodeDir, "logs")
Expand Down Expand Up @@ -550,6 +582,12 @@ func (r *RayLogHandler) processPrevLogsDir(sessionNodeDir string) {
return nil
}

// Check if this file has already been persisted
if r.isFileAlreadyPersisted(path, sessionID, nodeID) {
logrus.Debugf("File %s already persisted, skipping", path)
return nil
}

// Process log file
if err := r.processPrevLogFile(path, logsDir, sessionID, nodeID); err != nil {
logrus.Errorf("Failed to process prev-log file %s: %v", path, err)
Expand Down Expand Up @@ -616,7 +654,7 @@ func (r *RayLogHandler) processPrevLogFile(absoluteLogPathName, localLogDir, ses
logrus.Infof("Successfully wrote object %s, size: %d bytes", objectName, len(content))

// Move the processed file to persist-complete-logs directory to avoid re-uploading
completeBaseDir := filepath.Join("/tmp/ray/persist-complete-logs", sessionID, nodeID)
completeBaseDir := filepath.Join(r.persistCompleteLogsDir, sessionID, nodeID)
completeDir := filepath.Join(completeBaseDir, "logs")

if _, err := os.Stat(completeDir); os.IsNotExist(err) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
package logcollector

import (
"io"
"os"
"path/filepath"
"sync"
"testing"
"time"

. "github.com/onsi/gomega"
)

// MockStorageWriter is a mock implementation of storage.StorageWriter for testing
type MockStorageWriter struct {
mu sync.Mutex
createdDirs []string
writtenFiles map[string]string // path -> content
}

func NewMockStorageWriter() *MockStorageWriter {
return &MockStorageWriter{
createdDirs: make([]string, 0),
writtenFiles: make(map[string]string),
}
}

func (m *MockStorageWriter) CreateDirectory(path string) error {
m.mu.Lock()
defer m.mu.Unlock()
m.createdDirs = append(m.createdDirs, path)
return nil
}

func (m *MockStorageWriter) WriteFile(file string, reader io.ReadSeeker) error {
content, err := io.ReadAll(reader)
if err != nil {
return err
}
m.mu.Lock()
defer m.mu.Unlock()
m.writtenFiles[file] = string(content)
return nil
}

// setupRayTestEnvironment creates test directories under /tmp/ray for realistic testing
// This matches the actual paths used by the logcollector
func setupRayTestEnvironment(t *testing.T) (string, func()) {
baseDir := filepath.Join("/tmp", "ray-test-"+t.Name())

// Create base directory
if err := os.MkdirAll(baseDir, 0755); err != nil {
t.Fatalf("Failed to create base dir: %v", err)
}

// Create prev-logs and persist-complete-logs directories
prevLogsDir := filepath.Join(baseDir, "prev-logs")
persistLogsDir := filepath.Join(baseDir, "persist-complete-logs")

if err := os.MkdirAll(prevLogsDir, 0755); err != nil {
t.Fatalf("Failed to create prev-logs dir: %v", err)
}
if err := os.MkdirAll(persistLogsDir, 0755); err != nil {
t.Fatalf("Failed to create persist-complete-logs dir: %v", err)
}

cleanup := func() {
os.RemoveAll(baseDir)
}

return baseDir, cleanup
}

// createTestLogFile creates a test log file with given content
func createTestLogFile(t *testing.T, path string, content string) {
dir := filepath.Dir(path)
if err := os.MkdirAll(dir, 0755); err != nil {
t.Fatalf("Failed to create directory %s: %v", dir, err)
}

if err := os.WriteFile(path, []byte(content), 0644); err != nil {
t.Fatalf("Failed to write file %s: %v", path, err)
}
}

// TestIsFileAlreadyPersisted tests the file-level persistence check
func TestIsFileAlreadyPersisted(t *testing.T) {
baseDir, cleanup := setupRayTestEnvironment(t)
defer cleanup()

// Use the actual prev-logs directory structure that matches production
handler := &RayLogHandler{
prevLogsDir: filepath.Join(baseDir, "prev-logs"),
persistCompleteLogsDir: filepath.Join(baseDir, "persist-complete-logs"),
}

sessionID := "session-123"
nodeID := "node-456"

// Create prev-logs structure
prevLogsPath := filepath.Join(handler.prevLogsDir, sessionID, nodeID, "logs", "worker.log")
createTestLogFile(t, prevLogsPath, "test log content")

// Test case 1: File not yet persisted
if handler.isFileAlreadyPersisted(prevLogsPath, sessionID, nodeID) {
t.Error("Expected file to not be persisted yet")
}

// Create the persisted file in persist-complete-logs
persistedPath := filepath.Join(baseDir, "persist-complete-logs", sessionID, nodeID, "logs", "worker.log")
createTestLogFile(t, persistedPath, "test log content")

// Test case 2: File already persisted
if !handler.isFileAlreadyPersisted(prevLogsPath, sessionID, nodeID) {
t.Error("Expected file to be detected as persisted")
}
}

// TestScanAndProcess tests the full lifecycle: partial upload, interruption, and resumption via scan.
//
// This test simulates a crash recovery scenario:
// 1. Two log files exist in prev-logs
// 2. Only file1 is processed (simulating partial success before crash)
// 3. File1 is restored to prev-logs (simulating incomplete rename during crash)
// 4. WatchPrevLogsLoops is started (simulating collector restart)
// 5. Verify that file1 is NOT re-uploaded (idempotency) and file2 is uploaded
// 6. Verify that the node directory is cleaned up after all files are processed
func TestScanAndProcess(t *testing.T) {
g := NewWithT(t)

baseDir, cleanup := setupRayTestEnvironment(t)
defer cleanup()

mockWriter := NewMockStorageWriter()
handler := &RayLogHandler{
Writer: mockWriter,
RootDir: "/test-root",
prevLogsDir: filepath.Join(baseDir, "prev-logs"),
persistCompleteLogsDir: filepath.Join(baseDir, "persist-complete-logs"),
ShutdownChan: make(chan struct{}),
RayClusterName: "test-cluster",
RayClusterID: "cluster-123",
}

sessionID := "session-lifecycle"
nodeID := "node-1"
logsDir := filepath.Join(handler.prevLogsDir, sessionID, nodeID, "logs")

// Prepare two log files in prev-logs directory
f1 := filepath.Join(logsDir, "file1.log")
f2 := filepath.Join(logsDir, "file2.log")
createTestLogFile(t, f1, "content1")
createTestLogFile(t, f2, "content2")

// --- Step 1: Process file1 only (simulating partial success before crash) ---
err := handler.processPrevLogFile(f1, logsDir, sessionID, nodeID)
if err != nil {
t.Fatalf("Failed to process file1: %v", err)
}

// Verify file1 is uploaded to storage
if len(mockWriter.writtenFiles) != 1 {
t.Errorf("Expected 1 file in storage, got %d", len(mockWriter.writtenFiles))
}

// Manually restore file1 to prev-logs to simulate a crash right after upload
// but before the rename operation completed
createTestLogFile(t, f1, "content1")

// --- Step 2: Start the startup scan in background (simulating collector restart) ---
go handler.WatchPrevLogsLoops()

// --- Step 3: Use Eventually to wait for async processing ---
sessionNodeDir := filepath.Join(handler.prevLogsDir, sessionID, nodeID)

// Wait until storage has exactly 2 files.
// file1 should NOT be re-uploaded because it already exists in persist-complete-logs.
// Only file2 should be newly uploaded.
g.Eventually(func() int {
mockWriter.mu.Lock()
defer mockWriter.mu.Unlock()
return len(mockWriter.writtenFiles)
}, 5*time.Second, 100*time.Millisecond).Should(Equal(2),
"Storage should have 2 unique files (file1 should NOT be re-uploaded due to idempotency check)")

// Wait until the node directory in prev-logs is removed.
// After all files are processed and moved to persist-complete-logs,
// the node directory should be cleaned up.
g.Eventually(func() bool {
_, err := os.Stat(sessionNodeDir)
return os.IsNotExist(err)
}, 5*time.Second, 100*time.Millisecond).Should(BeTrue(),
"Node directory should be removed after all files are processed and moved to persist-complete-logs")

// Signal the background goroutine to exit gracefully
close(handler.ShutdownChan)
}
Loading
Loading