Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: consider using filepath functions to handle file path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for review. There are some same issues in this file. Maybe we can file a new ticket to handle it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed to handle it as a followup

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @my-vegetable-has-exploded
do you mind create an issue so that I can assign to others?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no problem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,24 @@ import (
)

type RayLogHandler struct {
Writer storage.StorageWriter
LogFiles chan string
HttpClient *http.Client
ShutdownChan chan struct{}
logFilePaths map[string]bool
MetaDir string
RayClusterName string
LogDir string
RayNodeName string
RayClusterID string
RootDir string
SessionDir string
prevLogsDir string
PushInterval time.Duration
LogBatching int
filePathMu sync.Mutex
EnableMeta bool
Writer storage.StorageWriter
LogFiles chan string
HttpClient *http.Client
ShutdownChan chan struct{}
logFilePaths map[string]bool
MetaDir string
RayClusterName string
LogDir string
RayNodeName string
RayClusterID string
RootDir string
SessionDir string
prevLogsDir string
persistCompleteLogsDir string
PushInterval time.Duration
LogBatching int
filePathMu sync.Mutex
EnableMeta bool
}

func (r *RayLogHandler) Start(stop <-chan struct{}) error {
Expand All @@ -49,6 +50,7 @@ func (r *RayLogHandler) Start(stop <-chan struct{}) error {
func (r *RayLogHandler) Run(stop <-chan struct{}) error {
// watchPath := r.LogDir
r.prevLogsDir = "/tmp/ray/prev-logs"
r.persistCompleteLogsDir = "/tmp/ray/persist-complete-logs"
Comment on lines 52 to +53
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move these to constant in historyserver/pkg/utils/utils.go similar to KunWuLuan@9b2ca52?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @justinyeh1995,

Thanks for pointing this out. I’m currently working on it (handling all constants at once) and will open a PR soon. I’m not sure whether it should be included here or handled in a separate PR.

Copy link
Contributor

@justinyeh1995 justinyeh1995 Jan 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no problem. I wasn't aware of that when I wrote the comment. Is the pr related to any issue though? I think I need to catch up quite a bit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can refer to this issue. Thanks a lot!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for heads up!


// Initialize log file paths storage
r.logFilePaths = make(map[string]bool)
Expand All @@ -62,6 +64,11 @@ func (r *RayLogHandler) Run(stop <-chan struct{}) error {
// Setup signal handling for SIGTERM
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM)

// On startup, scan and process all existing prev-logs
// This ensures that any incomplete uploads from previous runs are resumed
go r.scanAndProcessExistingPrevLogs()

go r.WatchPrevLogsLoops()
if r.EnableMeta {
go r.WatchSessionLatestLoops() // Watch session_latest symlink changes
Expand Down Expand Up @@ -293,6 +300,43 @@ func (r *RayLogHandler) WatchLogsLoops(watcher *fsnotify.Watcher, walkPath strin
}
}

// scanAndProcessExistingPrevLogs scans the prev-logs directory on startup
// and processes any existing sessions/nodes that may have been left from previous runs.
// This ensures resumption of incomplete uploads after collector restart.
func (r *RayLogHandler) scanAndProcessExistingPrevLogs() {
watchPath := r.prevLogsDir

logrus.Infof("Starting initial scan of prev-logs directory: %s", watchPath)

// Check if prev-logs directory exists
if _, err := os.Stat(watchPath); os.IsNotExist(err) {
logrus.Infof("prev-logs directory does not exist on startup, nothing to process")
return
}

// Read all session directories (first level only)
sessionEntries, err := os.ReadDir(watchPath)
if err != nil {
logrus.Errorf("Failed to read prev-logs directory %s: %v", watchPath, err)
return
}

for _, sessionEntry := range sessionEntries {
if !sessionEntry.IsDir() {
continue
}

sessionID := sessionEntry.Name()
sessionPath := filepath.Join(watchPath, sessionID)
logrus.Infof("Found existing session directory on startup: %s", sessionID)

// Process all node directories under this session
r.processSessionPrevLogs(sessionPath)
}

logrus.Infof("Completed initial scan of prev-logs directory")
}

func (r *RayLogHandler) WatchPrevLogsLoops() {
watchPath := r.prevLogsDir

Expand All @@ -310,7 +354,7 @@ func (r *RayLogHandler) WatchPrevLogsLoops() {
}

// Also check and create persist-complete-logs directory
completeLogsDir := "/tmp/ray/persist-complete-logs"
completeLogsDir := r.persistCompleteLogsDir
if _, err := os.Stat(completeLogsDir); os.IsNotExist(err) {
logrus.Infof("persist-complete-logs directory does not exist, creating it: %s", completeLogsDir)
if err := os.MkdirAll(completeLogsDir, 0o777); err != nil {
Expand Down Expand Up @@ -571,6 +615,26 @@ func (r *RayLogHandler) processSessionPrevLogs(sessionDir string) {
}
}

// isFileAlreadyPersisted checks if a file has already been persisted to persist-complete-logs
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isFileAlreadyPersisted detects files already moved to persist-complete-logs and avoid duplicate uploads.

func (r *RayLogHandler) isFileAlreadyPersisted(absoluteLogPath, sessionID, nodeID string) bool {
// Calculate the relative path within the logs directory
logsDir := filepath.Join(r.prevLogsDir, sessionID, nodeID, "logs")
relativePath, err := filepath.Rel(logsDir, absoluteLogPath)
if err != nil {
logrus.Errorf("Failed to get relative path for %s: %v", absoluteLogPath, err)
return false
}

// Construct the path in persist-complete-logs
persistedPath := filepath.Join(r.persistCompleteLogsDir, sessionID, nodeID, "logs", relativePath)

// Check if the file exists
if _, err := os.Stat(persistedPath); err == nil {
return true
}
return false
}

// processPrevLogsDir processes logs in a /tmp/ray/prev-logs/{sessionid}/{nodeid} directory
func (r *RayLogHandler) processPrevLogsDir(sessionNodeDir string) {
// Extract session ID and node ID from the path
Expand All @@ -592,13 +656,6 @@ func (r *RayLogHandler) processPrevLogsDir(sessionNodeDir string) {
return
}

// Check if this directory has already been processed by checking in persist-complete-logs
Copy link
Contributor

@JiangJiaWei1103 JiangJiaWei1103 Jan 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No changes are needed here. This note is just to clarify that we still need to check for leftover log files since the presence of log directories under persist-complete-logs directory alone does not guarantee that all logs have been fully persisted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review! If the Collector crashes halfway through a node directory, persist-complete-logs already exists but some logs remain in prev-logs. Keeping this check would cause the Collector to skip the entire directory upon restart, leading to data loss. The current file-level check isFileAlreadyPersisted will handles this "partial success" scenario correctly without duplicate uploads.

Copy link
Contributor Author

@my-vegetable-has-exploded my-vegetable-has-exploded Jan 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replacing it with following codes then you can trigger this case using cd historyserver && go test -v ./pkg/collector/logcollector/runtime/logcollector/ -run TestScanAndProcess.@JiangJiaWei1103

	completeDir := filepath.Join(r.persistCompleteLogsDir, sessionID, nodeID, "logs")
	if _, err := os.Stat(completeDir); err == nil {
		logrus.Infof("Session %s node %s logs already processed, skipping", sessionID, nodeID)
		return
	}

And e2e test also covers this case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, thanks for the detailed explanation! I originally just wanted to leave a brief note for maintainers to get why we removed the code snippet here. All tests pass in my local env. Thanks!

completeDir := filepath.Join("/tmp/ray/persist-complete-logs", sessionID, nodeID, "logs")
if _, err := os.Stat(completeDir); err == nil {
logrus.Infof("Session %s node %s logs already processed, skipping", sessionID, nodeID)
return
}

logrus.Infof("Processing prev-logs for session: %s, node: %s", sessionID, nodeID)

logsDir := filepath.Join(sessionNodeDir, "logs")
Expand Down Expand Up @@ -629,6 +686,12 @@ func (r *RayLogHandler) processPrevLogsDir(sessionNodeDir string) {
return nil
}

// Check if this file has already been persisted
if r.isFileAlreadyPersisted(path, sessionID, nodeID) {
logrus.Debugf("File %s already persisted, skipping", path)
return nil
}

// Process log file
if err := r.processPrevLogFile(path, logsDir, sessionID, nodeID); err != nil {
logrus.Errorf("Failed to process prev-log file %s: %v", path, err)
Expand Down Expand Up @@ -695,7 +758,7 @@ func (r *RayLogHandler) processPrevLogFile(absoluteLogPathName, localLogDir, ses
logrus.Infof("Successfully wrote object %s, size: %d bytes", objectName, len(content))

// Move the processed file to persist-complete-logs directory to avoid re-uploading
completeBaseDir := filepath.Join("/tmp/ray/persist-complete-logs", sessionID, nodeID)
completeBaseDir := filepath.Join(r.persistCompleteLogsDir, sessionID, nodeID)
completeDir := filepath.Join(completeBaseDir, "logs")

if _, err := os.Stat(completeDir); os.IsNotExist(err) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package logcollector

import (
"io"
"os"
"path/filepath"
"sync"
"testing"
"time"
)

// MockStorageWriter is a mock implementation of storage.StorageWriter for testing
type MockStorageWriter struct {
mu sync.Mutex
createdDirs []string
writtenFiles map[string]string // path -> content
}

func NewMockStorageWriter() *MockStorageWriter {
return &MockStorageWriter{
createdDirs: make([]string, 0),
writtenFiles: make(map[string]string),
}
}

func (m *MockStorageWriter) CreateDirectory(path string) error {
m.mu.Lock()
defer m.mu.Unlock()
m.createdDirs = append(m.createdDirs, path)
return nil
}

func (m *MockStorageWriter) WriteFile(file string, reader io.ReadSeeker) error {
content, err := io.ReadAll(reader)
if err != nil {
return err
}
m.mu.Lock()
defer m.mu.Unlock()
m.writtenFiles[file] = string(content)
return nil
}

// setupRayTestEnvironment creates test directories under /tmp/ray for realistic testing
// This matches the actual paths used by the logcollector
func setupRayTestEnvironment(t *testing.T) (string, func()) {
baseDir := filepath.Join("/tmp", "ray-test-"+t.Name())

// Create base directory
if err := os.MkdirAll(baseDir, 0755); err != nil {
t.Fatalf("Failed to create base dir: %v", err)
}

// Create prev-logs and persist-complete-logs directories
prevLogsDir := filepath.Join(baseDir, "prev-logs")
persistLogsDir := filepath.Join(baseDir, "persist-complete-logs")

if err := os.MkdirAll(prevLogsDir, 0755); err != nil {
t.Fatalf("Failed to create prev-logs dir: %v", err)
}
if err := os.MkdirAll(persistLogsDir, 0755); err != nil {
t.Fatalf("Failed to create persist-complete-logs dir: %v", err)
}

cleanup := func() {
os.RemoveAll(baseDir)
}

return baseDir, cleanup
}

// createTestLogFile creates a test log file with given content
func createTestLogFile(t *testing.T, path string, content string) {
dir := filepath.Dir(path)
if err := os.MkdirAll(dir, 0755); err != nil {
t.Fatalf("Failed to create directory %s: %v", dir, err)
}

if err := os.WriteFile(path, []byte(content), 0644); err != nil {
t.Fatalf("Failed to write file %s: %v", path, err)
}
}

// TestIsFileAlreadyPersisted tests the file-level persistence check
func TestIsFileAlreadyPersisted(t *testing.T) {
baseDir, cleanup := setupRayTestEnvironment(t)
defer cleanup()

// Use the actual prev-logs directory structure that matches production
handler := &RayLogHandler{
prevLogsDir: filepath.Join(baseDir, "prev-logs"),
persistCompleteLogsDir: filepath.Join(baseDir, "persist-complete-logs"),
}

sessionID := "session-123"
nodeID := "node-456"

// Create prev-logs structure
prevLogsPath := filepath.Join(handler.prevLogsDir, sessionID, nodeID, "logs", "worker.log")
createTestLogFile(t, prevLogsPath, "test log content")

// Test case 1: File not yet persisted
if handler.isFileAlreadyPersisted(prevLogsPath, sessionID, nodeID) {
t.Error("Expected file to not be persisted yet")
}

// Create the persisted file in persist-complete-logs
persistedPath := filepath.Join(baseDir, "persist-complete-logs", sessionID, nodeID, "logs", "worker.log")
createTestLogFile(t, persistedPath, "test log content")

// Test case 2: File already persisted
if !handler.isFileAlreadyPersisted(prevLogsPath, sessionID, nodeID) {
t.Error("Expected file to be detected as persisted")
}
}

// TestScanAndProcess tests the full lifecycle: partial upload, interruption, and resumption via scan
func TestScanAndProcess(t *testing.T) {
baseDir, cleanup := setupRayTestEnvironment(t)
defer cleanup()

mockWriter := NewMockStorageWriter()
handler := &RayLogHandler{
Writer: mockWriter,
RootDir: "/test-root",
prevLogsDir: filepath.Join(baseDir, "prev-logs"),
persistCompleteLogsDir: filepath.Join(baseDir, "persist-complete-logs"),
ShutdownChan: make(chan struct{}),
RayClusterName: "test-cluster",
RayClusterID: "cluster-123",
}

sessionID := "session-lifecycle"
nodeID := "node-1"
logsDir := filepath.Join(handler.prevLogsDir, sessionID, nodeID, "logs")

// Prepare two files
f1 := filepath.Join(logsDir, "file1.log")
f2 := filepath.Join(logsDir, "file2.log")
createTestLogFile(t, f1, "content1")
createTestLogFile(t, f2, "content2")

// --- Step 1: Process file1 only (simulating partial success) ---
err := handler.processPrevLogFile(f1, logsDir, sessionID, nodeID)
if err != nil {
t.Fatalf("Failed to process file1: %v", err)
}

// Verify file1 is in storage
if len(mockWriter.writtenFiles) != 1 {
t.Errorf("Expected 1 file in storage, got %d", len(mockWriter.writtenFiles))
}

// Manually restore file1 to prev-logs to simulate a crash right after upload but before/during rename
createTestLogFile(t, f1, "content1")

// --- Step 2: Run startup scan ---
handler.scanAndProcessExistingPrevLogs()

// Wait for async processing
time.Sleep(200 * time.Millisecond)

// --- Step 3: Final Verification ---
// 1. Storage should have 2 unique files (file1 should NOT be re-uploaded)
if len(mockWriter.writtenFiles) != 2 {
t.Errorf("Expected 2 unique files in storage, got %d", len(mockWriter.writtenFiles))
}

// 2. The node directory in prev-logs should be removed now that all files are processed
sessionNodeDir := filepath.Join(handler.prevLogsDir, sessionID, nodeID)
if _, err := os.Stat(sessionNodeDir); !os.IsNotExist(err) {
t.Error("Node directory should be removed after all files are processed and moved")
}
}
Loading