Skip to content

Commit

Permalink
fix raceconditions in tail library
Browse files Browse the repository at this point in the history
This fixes a couple of race conditions in the tail library, combining
the ideas of nxadm/tail#70 and
nxadm/tail#71.

- Currently when StopAtEOF is called, and we previously encountered an
  EOF already, we stop reading the file immediately. However when
  tailing a file, new data might have become available in the meantime,
  before the StopAtEOF is called. The watcher might however not have
  notified us about that yet.

  Instead of exiting immediately if that happens, and leaving the data
  that's already in the file unread, continue iterating until we get the
  next EOF, as we can be reasonably sure that that's the EOF the user
  meant to stop at, making sure to read all the data that has been
  written by the time StopAtEOF is called.

- When a StopAtEOF() is called the code should continue to send all
  lines to the Lines channel. The issue here is if the caller is not
  ready to receive a new line the code blocks as it is using a
  unbuffered channel. However <-tail.Dying() would return in this case
  so the line was skipped. This means that the caller did not get all
  lines until EOF. Now we still want to skip in case any other reason
  for kill was given therefore add special logic to only not read the
  Dying channel on the EOF case.

  The one downside is that StopAtEOF() could block forever if the
  caller never reads new Lines but this seems logical to me. If the
  caller wants to wait for EOF but never reads remaining Lines this
  would be a bug on their end.

Co-authored-by: Paul Holzinger <[email protected]>
  • Loading branch information
tgummerer and Luap99 committed Dec 13, 2024
1 parent 258035f commit 23d265a
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 14 deletions.
28 changes: 27 additions & 1 deletion sdk/go/common/tail/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ func (tail *Tail) tailFileSync() {

tail.openReader()

stopOnNextEOF := false
// Read line by line.
for {
// do not seek in named pipes
Expand Down Expand Up @@ -339,11 +340,24 @@ func (tail *Tail) tailFileSync() {
}
}

if stopOnNextEOF {
return
}

// When EOF is reached, wait for more data to become
// available. Wait strategy is based on the `tail.watcher`
// implementation (inotify or polling).
err := tail.waitForChanges()
if err != nil {
// When StopAtEOF() is called, we
// might have more data to read, that
// the filewatcher might not have
// notified us about. Continue
// reading until we found an EOF
// again, and then exit.
if err == ErrStop && tail.Err() == errStopAtEOF {
stopOnNextEOF = true
}
if err != ErrStop {
tail.Kill(err)
}
Expand Down Expand Up @@ -448,12 +462,24 @@ func (tail *Tail) sendLine(line string) bool {
lines = util.PartitionString(line, tail.MaxLineSize)
}

// This is a bit weird here, when a users requests stopAtEof we
// must keep sending all lines however <-tail.Dying() will return
// immediately at this point so the select below may not have
// chance to send the line if the reader side has is not yet ready.
// But if StopAtEOF was not set and it is a "normal" Kill then we
// should exit right away still thus the special logic here.
earlyExitChan := tail.Dying()
if tail.Err() == errStopAtEOF {
// Note that receive from a nil channel blocks forever so
// below we know it can only take the tail.Lines case.
earlyExitChan = nil
}
for _, line := range lines {
tail.lineNum++
offset, _ := tail.Tell()
select {
case tail.Lines <- &Line{line, tail.lineNum, SeekInfo{Offset: offset}, now, nil}:
case <-tail.Dying():
case <-earlyExitChan:
return true
}
}
Expand Down
60 changes: 47 additions & 13 deletions sdk/go/common/tail/tail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestWaitsForFileToExistRelativePath(t *testing.T) {
go tailTest.VerifyTailOutput(tail, []string{"hello", "world"}, false)

<-time.After(100 * time.Millisecond)
if err := ioutil.WriteFile("test.txt", []byte("hello\nworld\n"), 0600); err != nil {
if err := ioutil.WriteFile("test.txt", []byte("hello\nworld\n"), 0o600); err != nil {
tailTest.Fatal(err)
}
tailTest.Cleanup(tail, true)
Expand Down Expand Up @@ -313,7 +313,8 @@ func TestReSeekWithCursor(t *testing.T) {
Config{Follow: true, ReOpen: false, Poll: false})

go tailTest.VerifyTailOutputUsingCursor(tail, []string{
"a really long string goes here", "hello", "world", "but", "not", "me"}, false)
"a really long string goes here", "hello", "world", "but", "not", "me",
}, false)

// truncate now
<-time.After(100 * time.Millisecond)
Expand All @@ -336,7 +337,8 @@ func TestRateLimiting(t *testing.T) {
tailTest.CreateFile("test.txt", "hello\nworld\nagain\nextra\n")
config := Config{
Follow: true,
RateLimiter: ratelimiter.NewLeakyBucket(2, time.Second)}
RateLimiter: ratelimiter.NewLeakyBucket(2, time.Second),
}
leakybucketFull := "Too much log activity; waiting a second before resuming tailing"
tail := tailTest.StartTail("test.txt", config)

Expand All @@ -345,7 +347,8 @@ func TestRateLimiting(t *testing.T) {
"hello", "world", "again",
leakybucketFull,
"more", "data",
leakybucketFull}, false)
leakybucketFull,
}, false)

// Add more data only after reasonable delay.
<-time.After(1200 * time.Millisecond)
Expand All @@ -365,7 +368,8 @@ func TestTell(t *testing.T) {
tailTest.CreateFile("test.txt", "hello\nworld\nagain\nmore\n")
config := Config{
Follow: false,
Location: &SeekInfo{0, io.SeekStart}}
Location: &SeekInfo{0, io.SeekStart},
}
tail := tailTest.StartTail("test.txt", config)
// read one line
line := <-tail.Lines
Expand All @@ -380,7 +384,8 @@ func TestTell(t *testing.T) {

config = Config{
Follow: false,
Location: &SeekInfo{offset, io.SeekStart}}
Location: &SeekInfo{offset, io.SeekStart},
}
tail = tailTest.StartTail("test.txt", config)
for l := range tail.Lines {
// it may readed one line in the chan(tail.Lines),
Expand Down Expand Up @@ -422,6 +427,26 @@ func TestBlockUntilExists(t *testing.T) {
tail.Cleanup()
}

func TestFollowUntilEof(t *testing.T) {
tailTest, cleanup := NewTailTest("incomplete-lines-no-follow", t)
defer cleanup()
filename := "test.txt"
config := Config{
Follow: false,
}
tailTest.CreateFile(filename, "hello\nworld\n")
tail := tailTest.StartTail(filename, config)

// StopAtEOF blocks until the read is done and in order to do so
// we have to drain the lines channel first which ReadLinesWithError does.
go tail.StopAtEOF()
tailTest.ReadLinesWithError(tail, []string{"hello", "world"}, false, errStopAtEOF)

tailTest.RemoveFile(filename)
tail.Stop()
tail.Cleanup()
}

func maxLineSize(t *testing.T, follow bool, fileContent string, expected []string) {
tailTest, cleanup := NewTailTest("maxlinesize", t)
defer cleanup()
Expand Down Expand Up @@ -637,7 +662,8 @@ func reSeek(t *testing.T, poll bool) {
Config{Follow: true, ReOpen: false, Poll: poll})

go tailTest.VerifyTailOutput(tail, []string{
"a really long string goes here", "hello", "world", "h311o", "w0r1d", "endofworld"}, false)
"a really long string goes here", "hello", "world", "h311o", "w0r1d", "endofworld",
}, false)

// truncate now
<-time.After(100 * time.Millisecond)
Expand Down Expand Up @@ -677,14 +703,14 @@ func NewTailTest(name string, t *testing.T) (TailTest, func()) {
}

func (t TailTest) CreateFile(name string, contents string) {
err := ioutil.WriteFile(t.path+"/"+name, []byte(contents), 0600)
err := ioutil.WriteFile(t.path+"/"+name, []byte(contents), 0o600)
if err != nil {
t.Fatal(err)
}
}

func (t TailTest) AppendToFile(name string, contents string) {
err := ioutil.WriteFile(t.path+"/"+name, []byte(contents), 0600|os.ModeAppend)
err := ioutil.WriteFile(t.path+"/"+name, []byte(contents), 0o600|os.ModeAppend)
if err != nil {
t.Fatal(err)
}
Expand All @@ -707,7 +733,7 @@ func (t TailTest) RenameFile(oldname string, newname string) {
}

func (t TailTest) AppendFile(name string, contents string) {
f, err := os.OpenFile(t.path+"/"+name, os.O_APPEND|os.O_WRONLY, 0600)
f, err := os.OpenFile(t.path+"/"+name, os.O_APPEND|os.O_WRONLY, 0o600)
if err != nil {
t.Fatal(err)
}
Expand All @@ -719,7 +745,7 @@ func (t TailTest) AppendFile(name string, contents string) {
}

func (t TailTest) TruncateFile(name string, contents string) {
f, err := os.OpenFile(t.path+"/"+name, os.O_TRUNC|os.O_WRONLY, 0600)
f, err := os.OpenFile(t.path+"/"+name, os.O_TRUNC|os.O_WRONLY, 0o600)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -765,6 +791,14 @@ func (t TailTest) VerifyTailOutputUsingCursor(tail *Tail, lines []string, expect
}

func (t TailTest) ReadLines(tail *Tail, lines []string, useCursor bool) {
t.readLines(tail, lines, useCursor, nil)
}

func (t TailTest) ReadLinesWithError(tail *Tail, lines []string, useCursor bool, err error) {
t.readLines(tail, lines, useCursor, err)
}

func (t TailTest) readLines(tail *Tail, lines []string, useCursor bool, expectedErr error) {
cursor := 1

for _, line := range lines {
Expand All @@ -773,8 +807,8 @@ func (t TailTest) ReadLines(tail *Tail, lines []string, useCursor bool) {
if !ok {
// tail.Lines is closed and empty.
err := tail.Err()
if err != nil {
t.Fatalf("tail ended with error: %v", err)
if err != expectedErr {
t.Fatalf("tail ended with unexpected error: %v", err)
}
t.Fatalf("tail ended early; expecting more: %v", lines[cursor:])
}
Expand Down

0 comments on commit 23d265a

Please sign in to comment.