Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 38 additions & 23 deletions components/execd/pkg/runtime/command_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ package runtime

import (
"bufio"
"bytes"
"io"
"os"
"path/filepath"
"sync"
"time"
)

Expand All @@ -28,13 +30,14 @@ func (c *Controller) tailStdPipe(file string, onExecute func(text string), done
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

mutex := &sync.Mutex{}
for {
select {
case <-done:
c.readFromPos(file, lastPos, onExecute)
c.readFromPos(mutex, file, lastPos, onExecute)
return
case <-ticker.C:
newPos := c.readFromPos(file, lastPos, onExecute)
newPos := c.readFromPos(mutex, file, lastPos, onExecute)
lastPos = newPos
}
}
Expand Down Expand Up @@ -81,7 +84,12 @@ func (c *Controller) stderrFileName(session string) string {
}

// readFromPos streams new content from a file starting at startPos.
func (c *Controller) readFromPos(filepath string, startPos int64, onExecute func(string)) int64 {
func (c *Controller) readFromPos(mutex *sync.Mutex, filepath string, startPos int64, onExecute func(string)) int64 {
if !mutex.TryLock() {
return -1
}
defer mutex.Unlock()

file, err := os.Open(filepath)
if err != nil {
return startPos
Expand All @@ -90,32 +98,39 @@ func (c *Controller) readFromPos(filepath string, startPos int64, onExecute func

_, _ = file.Seek(startPos, 0) //nolint:errcheck

scanner := bufio.NewScanner(file)
// Support long lines and treat both \n and \r as delimiters to keep progress output.
scanner.Buffer(make([]byte, 0, 64*1024), 5*1024*1024) // 5MB max token
scanner.Split(func(data []byte, atEOF bool) (advance int, token []byte, err error) {
for i, b := range data {
if b == '\n' || b == '\r' {
// Treat \r\n as a single delimiter to avoid empty tokens.
if b == '\r' && i+1 < len(data) && data[i+1] == '\n' {
return i + 2, data[:i], nil
}
return i + 1, data[:i], nil
reader := bufio.NewReader(file)
var buffer bytes.Buffer
var currentPos int64 = startPos

for {
b, err := reader.ReadByte()
if err != nil {
if err == io.EOF {
// If buffer has content but no newline, it's an incomplete line, don't output
break
}
break
}
if atEOF && len(data) > 0 {
return len(data), data, nil
currentPos++

// Check if it's a line terminator (\n or \r)
if b == '\n' || b == '\r' {
// If buffer has content, output this line
if buffer.Len() > 0 {
onExecute(buffer.String())
buffer.Reset()
}
// Skip line terminator
continue
}
return 0, nil, nil
})

for scanner.Scan() {
onExecute(scanner.Text())
}
if err := scanner.Err(); err != nil {
return startPos
buffer.WriteByte(b)
}

endPos, _ := file.Seek(0, 1)
// If the last read position doesn't end with a newline, return the buffer start position
if buffer.Len() > 0 {
return currentPos - int64(buffer.Len())
}
return endPos
}
9 changes: 6 additions & 3 deletions components/execd/pkg/runtime/command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"os/exec"
"path/filepath"
"strings"
"sync"
"testing"
"time"

Expand All @@ -32,14 +33,16 @@ func TestReadFromPos_SplitsOnCRAndLF(t *testing.T) {
tmp := t.TempDir()
logFile := filepath.Join(tmp, "stdout.log")

mutex := &sync.Mutex{}

initial := "line1\nprog 10%\rprog 20%\rprog 30%\nlast\n"
if err := os.WriteFile(logFile, []byte(initial), 0o644); err != nil {
t.Fatalf("write initial file: %v", err)
}

var got []string
c := &Controller{}
nextPos := c.readFromPos(logFile, 0, func(s string) { got = append(got, s) })
nextPos := c.readFromPos(mutex, logFile, 0, func(s string) { got = append(got, s) })

want := []string{"line1", "prog 10%", "prog 20%", "prog 30%", "last"}
if len(got) != len(want) {
Expand All @@ -64,7 +67,7 @@ func TestReadFromPos_SplitsOnCRAndLF(t *testing.T) {
_ = f.Close()

got = got[:0]
c.readFromPos(logFile, nextPos, func(s string) { got = append(got, s) })
c.readFromPos(mutex, logFile, nextPos, func(s string) { got = append(got, s) })
want = []string{"tail1", "tail2"}
if len(got) != len(want) {
t.Fatalf("incremental token count: got %d want %d", len(got), len(want))
Expand All @@ -88,7 +91,7 @@ func TestReadFromPos_LongLine(t *testing.T) {

var got []string
c := &Controller{}
c.readFromPos(logFile, 0, func(s string) { got = append(got, s) })
c.readFromPos(&sync.Mutex{}, logFile, 0, func(s string) { got = append(got, s) })

if len(got) != 1 {
t.Fatalf("expected one token, got %d", len(got))
Expand Down
Loading