diff --git a/components/execd/pkg/runtime/command_common.go b/components/execd/pkg/runtime/command_common.go index ba272b8d..6467d47f 100644 --- a/components/execd/pkg/runtime/command_common.go +++ b/components/execd/pkg/runtime/command_common.go @@ -16,9 +16,11 @@ package runtime import ( "bufio" + "bytes" "io" "os" "path/filepath" + "sync" "time" ) @@ -28,13 +30,14 @@ func (c *Controller) tailStdPipe(file string, onExecute func(text string), done ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() + mutex := &sync.Mutex{} for { select { case <-done: - c.readFromPos(file, lastPos, onExecute) + c.readFromPos(mutex, file, lastPos, onExecute) return case <-ticker.C: - newPos := c.readFromPos(file, lastPos, onExecute) + newPos := c.readFromPos(mutex, file, lastPos, onExecute) lastPos = newPos } } @@ -81,7 +84,12 @@ func (c *Controller) stderrFileName(session string) string { } // readFromPos streams new content from a file starting at startPos. -func (c *Controller) readFromPos(filepath string, startPos int64, onExecute func(string)) int64 { +func (c *Controller) readFromPos(mutex *sync.Mutex, filepath string, startPos int64, onExecute func(string)) int64 { + if !mutex.TryLock() { + return -1 + } + defer mutex.Unlock() + file, err := os.Open(filepath) if err != nil { return startPos @@ -90,32 +98,39 @@ func (c *Controller) readFromPos(filepath string, startPos int64, onExecute func _, _ = file.Seek(startPos, 0) //nolint:errcheck - scanner := bufio.NewScanner(file) - // Support long lines and treat both \n and \r as delimiters to keep progress output. - scanner.Buffer(make([]byte, 0, 64*1024), 5*1024*1024) // 5MB max token - scanner.Split(func(data []byte, atEOF bool) (advance int, token []byte, err error) { - for i, b := range data { - if b == '\n' || b == '\r' { - // Treat \r\n as a single delimiter to avoid empty tokens. - if b == '\r' && i+1 < len(data) && data[i+1] == '\n' { - return i + 2, data[:i], nil - } - return i + 1, data[:i], nil + reader := bufio.NewReader(file) + var buffer bytes.Buffer + var currentPos int64 = startPos + + for { + b, err := reader.ReadByte() + if err != nil { + if err == io.EOF { + // If buffer has content but no newline, it's an incomplete line, don't output + break } + break } - if atEOF && len(data) > 0 { - return len(data), data, nil + currentPos++ + + // Check if it's a line terminator (\n or \r) + if b == '\n' || b == '\r' { + // If buffer has content, output this line + if buffer.Len() > 0 { + onExecute(buffer.String()) + buffer.Reset() + } + // Skip line terminator + continue } - return 0, nil, nil - }) - for scanner.Scan() { - onExecute(scanner.Text()) - } - if err := scanner.Err(); err != nil { - return startPos + buffer.WriteByte(b) } endPos, _ := file.Seek(0, 1) + // If the last read position doesn't end with a newline, return the buffer start position + if buffer.Len() > 0 { + return currentPos - int64(buffer.Len()) + } return endPos } diff --git a/components/execd/pkg/runtime/command_test.go b/components/execd/pkg/runtime/command_test.go index c7040ce6..96c22deb 100644 --- a/components/execd/pkg/runtime/command_test.go +++ b/components/execd/pkg/runtime/command_test.go @@ -20,6 +20,7 @@ import ( "os/exec" "path/filepath" "strings" + "sync" "testing" "time" @@ -32,6 +33,8 @@ func TestReadFromPos_SplitsOnCRAndLF(t *testing.T) { tmp := t.TempDir() logFile := filepath.Join(tmp, "stdout.log") + mutex := &sync.Mutex{} + initial := "line1\nprog 10%\rprog 20%\rprog 30%\nlast\n" if err := os.WriteFile(logFile, []byte(initial), 0o644); err != nil { t.Fatalf("write initial file: %v", err) @@ -39,7 +42,7 @@ func TestReadFromPos_SplitsOnCRAndLF(t *testing.T) { var got []string c := &Controller{} - nextPos := c.readFromPos(logFile, 0, func(s string) { got = append(got, s) }) + nextPos := c.readFromPos(mutex, logFile, 0, func(s string) { got = append(got, s) }) want := []string{"line1", "prog 10%", "prog 20%", "prog 30%", "last"} if len(got) != len(want) { @@ -64,7 +67,7 @@ func TestReadFromPos_SplitsOnCRAndLF(t *testing.T) { _ = f.Close() got = got[:0] - c.readFromPos(logFile, nextPos, func(s string) { got = append(got, s) }) + c.readFromPos(mutex, logFile, nextPos, func(s string) { got = append(got, s) }) want = []string{"tail1", "tail2"} if len(got) != len(want) { t.Fatalf("incremental token count: got %d want %d", len(got), len(want)) @@ -88,7 +91,7 @@ func TestReadFromPos_LongLine(t *testing.T) { var got []string c := &Controller{} - c.readFromPos(logFile, 0, func(s string) { got = append(got, s) }) + c.readFromPos(&sync.Mutex{}, logFile, 0, func(s string) { got = append(got, s) }) if len(got) != 1 { t.Fatalf("expected one token, got %d", len(got))