Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions historyserver/docs/set_up_collector.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,25 @@ kubectl delete -f historyserver/config/raycluster.yaml
You're supposed to see the uploaded logs and events in the minio UI as below:

![write_logs_and_events](https://github.com/ray-project/kuberay/blob/db7cb864061518ed4cfa7bf48cf05cfbfeb49f95/historyserver/docs/assets/write_logs_and_events.png)

## Troubleshooting

### "too many open files" error
If you encounter `level=fatal msg="Create fsnotify NewWatcher error too many open files"` in the collector logs, it is likely due to the inotify limits on the Kubernetes nodes.

To fix this, increase the limits on the **host nodes** (not inside the container):

```bash
# Apply changes immediately
sudo sysctl -w fs.inotify.max_user_instances=8192
sudo sysctl -w fs.inotify.max_user_watches=524288
```

To make these changes persistent across reboots, use the following lines:

```text
echo "fs.inotify.max_user_instances=8192" | sudo tee -a /etc/sysctl.conf
echo "fs.inotify.max_user_watches=524288" | sudo tee -a /etc/sysctl.conf
sudo sysctl -p
```

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: consider using filepath functions to handle file path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for review. There are some same issues in this file. Maybe we can file a new ticket to handle it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed to handle it as a followup

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @my-vegetable-has-exploded
do you mind create an issue so that I can assign to others?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no problem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,24 @@ import (
)

type RayLogHandler struct {
Writer storage.StorageWriter
LogFiles chan string
HttpClient *http.Client
ShutdownChan chan struct{}
logFilePaths map[string]bool
MetaDir string
RayClusterName string
LogDir string
RayNodeName string
RayClusterID string
RootDir string
SessionDir string
prevLogsDir string
PushInterval time.Duration
LogBatching int
filePathMu sync.Mutex
EnableMeta bool
Writer storage.StorageWriter
LogFiles chan string
HttpClient *http.Client
ShutdownChan chan struct{}
logFilePaths map[string]bool
MetaDir string
RayClusterName string
LogDir string
RayNodeName string
RayClusterID string
RootDir string
SessionDir string
prevLogsDir string
persistCompleteLogsDir string
PushInterval time.Duration
LogBatching int
filePathMu sync.Mutex
EnableMeta bool
}

func (r *RayLogHandler) Start(stop <-chan struct{}) error {
Expand All @@ -49,6 +50,7 @@ func (r *RayLogHandler) Start(stop <-chan struct{}) error {
func (r *RayLogHandler) Run(stop <-chan struct{}) error {
// watchPath := r.LogDir
r.prevLogsDir = "/tmp/ray/prev-logs"
r.persistCompleteLogsDir = "/tmp/ray/persist-complete-logs"
Comment on lines 52 to +53
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move these to constant in historyserver/pkg/utils/utils.go similar to KunWuLuan@9b2ca52?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @justinyeh1995,

Thanks for pointing this out. I’m currently working on it (handling all constants at once) and will open a PR soon. I’m not sure whether it should be included here or handled in a separate PR.

Copy link
Contributor

@justinyeh1995 justinyeh1995 Jan 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no problem. I wasn't aware of that when I wrote the comment. Is the pr related to any issue though? I think I need to catch up quite a bit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can refer to this issue. Thanks a lot!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for heads up!


// Initialize log file paths storage
r.logFilePaths = make(map[string]bool)
Expand All @@ -62,6 +64,9 @@ func (r *RayLogHandler) Run(stop <-chan struct{}) error {
// Setup signal handling for SIGTERM
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM)

// WatchPrevLogsLoops will scan and process all existing prev-logs on startup,
// then watch for new files. This ensures incomplete uploads from previous runs are resumed.
go r.WatchPrevLogsLoops()
if r.EnableMeta {
go r.WatchSessionLatestLoops() // Watch session_latest symlink changes
Expand Down Expand Up @@ -310,7 +315,7 @@ func (r *RayLogHandler) WatchPrevLogsLoops() {
}

// Also check and create persist-complete-logs directory
completeLogsDir := "/tmp/ray/persist-complete-logs"
completeLogsDir := r.persistCompleteLogsDir
if _, err := os.Stat(completeLogsDir); os.IsNotExist(err) {
logrus.Infof("persist-complete-logs directory does not exist, creating it: %s", completeLogsDir)
if err := os.MkdirAll(completeLogsDir, 0o777); err != nil {
Expand Down Expand Up @@ -571,6 +576,26 @@ func (r *RayLogHandler) processSessionPrevLogs(sessionDir string) {
}
}

// isFileAlreadyPersisted checks if a file has already been persisted to persist-complete-logs
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isFileAlreadyPersisted detects files already moved to persist-complete-logs and avoid duplicate uploads.

func (r *RayLogHandler) isFileAlreadyPersisted(absoluteLogPath, sessionID, nodeID string) bool {
// Calculate the relative path within the logs directory
logsDir := filepath.Join(r.prevLogsDir, sessionID, nodeID, "logs")
relativePath, err := filepath.Rel(logsDir, absoluteLogPath)
if err != nil {
logrus.Errorf("Failed to get relative path for %s: %v", absoluteLogPath, err)
return false
}

// Construct the path in persist-complete-logs
persistedPath := filepath.Join(r.persistCompleteLogsDir, sessionID, nodeID, "logs", relativePath)

// Check if the file exists
if _, err := os.Stat(persistedPath); err == nil {
return true
}
return false
}

// processPrevLogsDir processes logs in a /tmp/ray/prev-logs/{sessionid}/{nodeid} directory
func (r *RayLogHandler) processPrevLogsDir(sessionNodeDir string) {
// Extract session ID and node ID from the path
Expand All @@ -592,13 +617,6 @@ func (r *RayLogHandler) processPrevLogsDir(sessionNodeDir string) {
return
}

// Check if this directory has already been processed by checking in persist-complete-logs
Copy link
Contributor

@JiangJiaWei1103 JiangJiaWei1103 Jan 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No changes are needed here. This note is just to clarify that we still need to check for leftover log files since the presence of log directories under persist-complete-logs directory alone does not guarantee that all logs have been fully persisted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review! If the Collector crashes halfway through a node directory, persist-complete-logs already exists but some logs remain in prev-logs. Keeping this check would cause the Collector to skip the entire directory upon restart, leading to data loss. The current file-level check isFileAlreadyPersisted will handles this "partial success" scenario correctly without duplicate uploads.

Copy link
Contributor Author

@my-vegetable-has-exploded my-vegetable-has-exploded Jan 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replacing it with following codes then you can trigger this case using cd historyserver && go test -v ./pkg/collector/logcollector/runtime/logcollector/ -run TestScanAndProcess.@JiangJiaWei1103

	completeDir := filepath.Join(r.persistCompleteLogsDir, sessionID, nodeID, "logs")
	if _, err := os.Stat(completeDir); err == nil {
		logrus.Infof("Session %s node %s logs already processed, skipping", sessionID, nodeID)
		return
	}

And e2e test also covers this case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, thanks for the detailed explanation! I originally just wanted to leave a brief note for maintainers to get why we removed the code snippet here. All tests pass in my local env. Thanks!

completeDir := filepath.Join("/tmp/ray/persist-complete-logs", sessionID, nodeID, "logs")
if _, err := os.Stat(completeDir); err == nil {
logrus.Infof("Session %s node %s logs already processed, skipping", sessionID, nodeID)
return
}

logrus.Infof("Processing prev-logs for session: %s, node: %s", sessionID, nodeID)

logsDir := filepath.Join(sessionNodeDir, "logs")
Expand Down Expand Up @@ -629,6 +647,12 @@ func (r *RayLogHandler) processPrevLogsDir(sessionNodeDir string) {
return nil
}

// Check if this file has already been persisted
if r.isFileAlreadyPersisted(path, sessionID, nodeID) {
logrus.Debugf("File %s already persisted, skipping", path)
return nil
}

// Process log file
if err := r.processPrevLogFile(path, logsDir, sessionID, nodeID); err != nil {
logrus.Errorf("Failed to process prev-log file %s: %v", path, err)
Expand Down Expand Up @@ -695,7 +719,7 @@ func (r *RayLogHandler) processPrevLogFile(absoluteLogPathName, localLogDir, ses
logrus.Infof("Successfully wrote object %s, size: %d bytes", objectName, len(content))

// Move the processed file to persist-complete-logs directory to avoid re-uploading
completeBaseDir := filepath.Join("/tmp/ray/persist-complete-logs", sessionID, nodeID)
completeBaseDir := filepath.Join(r.persistCompleteLogsDir, sessionID, nodeID)
completeDir := filepath.Join(completeBaseDir, "logs")

if _, err := os.Stat(completeDir); os.IsNotExist(err) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package logcollector

import (
"io"
"os"
"path/filepath"
"sync"
"testing"
"time"
)

// MockStorageWriter is a mock implementation of storage.StorageWriter for testing
type MockStorageWriter struct {
mu sync.Mutex
createdDirs []string
writtenFiles map[string]string // path -> content
}

func NewMockStorageWriter() *MockStorageWriter {
return &MockStorageWriter{
createdDirs: make([]string, 0),
writtenFiles: make(map[string]string),
}
}

func (m *MockStorageWriter) CreateDirectory(path string) error {
m.mu.Lock()
defer m.mu.Unlock()
m.createdDirs = append(m.createdDirs, path)
return nil
}

func (m *MockStorageWriter) WriteFile(file string, reader io.ReadSeeker) error {
content, err := io.ReadAll(reader)
if err != nil {
return err
}
m.mu.Lock()
defer m.mu.Unlock()
m.writtenFiles[file] = string(content)
return nil
}

// setupRayTestEnvironment creates test directories under /tmp/ray for realistic testing
// This matches the actual paths used by the logcollector
func setupRayTestEnvironment(t *testing.T) (string, func()) {
baseDir := filepath.Join("/tmp", "ray-test-"+t.Name())

// Create base directory
if err := os.MkdirAll(baseDir, 0755); err != nil {
t.Fatalf("Failed to create base dir: %v", err)
}

// Create prev-logs and persist-complete-logs directories
prevLogsDir := filepath.Join(baseDir, "prev-logs")
persistLogsDir := filepath.Join(baseDir, "persist-complete-logs")

if err := os.MkdirAll(prevLogsDir, 0755); err != nil {
t.Fatalf("Failed to create prev-logs dir: %v", err)
}
if err := os.MkdirAll(persistLogsDir, 0755); err != nil {
t.Fatalf("Failed to create persist-complete-logs dir: %v", err)
}

cleanup := func() {
os.RemoveAll(baseDir)
}

return baseDir, cleanup
}

// createTestLogFile creates a test log file with given content
func createTestLogFile(t *testing.T, path string, content string) {
dir := filepath.Dir(path)
if err := os.MkdirAll(dir, 0755); err != nil {
t.Fatalf("Failed to create directory %s: %v", dir, err)
}

if err := os.WriteFile(path, []byte(content), 0644); err != nil {
t.Fatalf("Failed to write file %s: %v", path, err)
}
}

// TestIsFileAlreadyPersisted tests the file-level persistence check
func TestIsFileAlreadyPersisted(t *testing.T) {
baseDir, cleanup := setupRayTestEnvironment(t)
defer cleanup()

// Use the actual prev-logs directory structure that matches production
handler := &RayLogHandler{
prevLogsDir: filepath.Join(baseDir, "prev-logs"),
persistCompleteLogsDir: filepath.Join(baseDir, "persist-complete-logs"),
}

sessionID := "session-123"
nodeID := "node-456"

// Create prev-logs structure
prevLogsPath := filepath.Join(handler.prevLogsDir, sessionID, nodeID, "logs", "worker.log")
createTestLogFile(t, prevLogsPath, "test log content")

// Test case 1: File not yet persisted
if handler.isFileAlreadyPersisted(prevLogsPath, sessionID, nodeID) {
t.Error("Expected file to not be persisted yet")
}

// Create the persisted file in persist-complete-logs
persistedPath := filepath.Join(baseDir, "persist-complete-logs", sessionID, nodeID, "logs", "worker.log")
createTestLogFile(t, persistedPath, "test log content")

// Test case 2: File already persisted
if !handler.isFileAlreadyPersisted(prevLogsPath, sessionID, nodeID) {
t.Error("Expected file to be detected as persisted")
}
}

// TestScanAndProcess tests the full lifecycle: partial upload, interruption, and resumption via scan
func TestScanAndProcess(t *testing.T) {
baseDir, cleanup := setupRayTestEnvironment(t)
defer cleanup()

mockWriter := NewMockStorageWriter()
handler := &RayLogHandler{
Writer: mockWriter,
RootDir: "/test-root",
prevLogsDir: filepath.Join(baseDir, "prev-logs"),
persistCompleteLogsDir: filepath.Join(baseDir, "persist-complete-logs"),
ShutdownChan: make(chan struct{}),
RayClusterName: "test-cluster",
RayClusterID: "cluster-123",
}

sessionID := "session-lifecycle"
nodeID := "node-1"
logsDir := filepath.Join(handler.prevLogsDir, sessionID, nodeID, "logs")

// Prepare two files
f1 := filepath.Join(logsDir, "file1.log")
f2 := filepath.Join(logsDir, "file2.log")
createTestLogFile(t, f1, "content1")
createTestLogFile(t, f2, "content2")

// --- Step 1: Process file1 only (simulating partial success) ---
err := handler.processPrevLogFile(f1, logsDir, sessionID, nodeID)
if err != nil {
t.Fatalf("Failed to process file1: %v", err)
}

// Verify file1 is in storage
if len(mockWriter.writtenFiles) != 1 {
t.Errorf("Expected 1 file in storage, got %d", len(mockWriter.writtenFiles))
}

// Manually restore file1 to prev-logs to simulate a crash right after upload but before/during rename
createTestLogFile(t, f1, "content1")

// --- Step 2: Run startup scan in background ---
go handler.WatchPrevLogsLoops()

// Wait for async processing - give more time for file2.log to be processed
time.Sleep(1000 * time.Millisecond)

// --- Step 3: Final Verification ---
// 1. Storage should have 2 unique files (file1 should NOT be re-uploaded)
if len(mockWriter.writtenFiles) != 2 {
t.Errorf("Expected 2 unique files in storage, got %d", len(mockWriter.writtenFiles))
}

// 2. The node directory in prev-logs should be removed now that all files are processed
sessionNodeDir := filepath.Join(handler.prevLogsDir, sessionID, nodeID)
if _, err := os.Stat(sessionNodeDir); !os.IsNotExist(err) {
t.Error("Node directory should be removed after all files are processed and moved")
}

// Ensure background goroutine exits
close(handler.ShutdownChan)
time.Sleep(100 * time.Millisecond)
}
Loading
Loading