Skip to content

Commit

Permalink
fix test race, appease linter
Browse files Browse the repository at this point in the history
  • Loading branch information
lestrrat committed Jan 6, 2021
1 parent f103b33 commit 171171d
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 52 deletions.
2 changes: 0 additions & 2 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ func IsBufferFull(e error) bool {
if cerr, ok := e.(causer); ok {
e = cerr.Cause()
}

e = nil
}
return false
}
Expand Down
97 changes: 69 additions & 28 deletions fluent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ type server struct {
cleanup func()
done chan struct{}
listener net.Listener
mu sync.RWMutex
ready chan struct{}
useJSON bool
Network string
Address string
Payload []*fluent.Message
payload []*fluent.Message
}

func newServer(useJSON bool) (*server, error) {
Expand Down Expand Up @@ -75,6 +76,42 @@ func (s *server) Ready() <-chan struct{} {
func (s *server) Done() <-chan struct{} {
return s.done
}
func (s *server) PayloadAt(i int) *fluent.Message {
s.mu.RLock()
m := s.payload[i]
s.mu.RUnlock()
return m
}

func (s *server) PayloadCount() int {
s.mu.RLock()
c := len(s.payload)
s.mu.RUnlock()
return c
}

func (s *server) PayloadIter() <-chan *fluent.Message {
s.mu.RLock()
ch := make(chan *fluent.Message, len(s.payload))
for _, p := range s.payload {
ch <- p
}
s.mu.RUnlock()
close(ch)
return ch
}

func (s *server) TruncatePayload() {
s.mu.Lock()
s.payload = s.payload[:0]
s.mu.Unlock()
}

func (s *server) AddPayload(v *fluent.Message) {
s.mu.Lock()
s.payload = append(s.payload, v)
s.mu.Unlock()
}

func (s *server) Run(ctx context.Context) {
if pdebug.Enabled {
Expand All @@ -83,13 +120,11 @@ func (s *server) Run(ctx context.Context) {
defer close(s.done)

go func() {
select {
case <-ctx.Done():
if pdebug.Enabled {
pdebug.Printf("context.Context is done, closing listeners")
}
s.listener.Close()
<-ctx.Done()
if pdebug.Enabled {
pdebug.Printf("context.Context is done, closing listeners")
}
s.listener.Close()
}()

if pdebug.Enabled {
Expand Down Expand Up @@ -210,13 +245,14 @@ func (s *server) Run(ctx context.Context) {
}
v.Record = newMap
}
s.Payload = append(s.Payload, v)
s.AddPayload(v)
}
}
}

func TestConnectOnStart(t *testing.T) {
for _, buffered := range []bool{true, false} {
buffered := buffered
t.Run(fmt.Sprintf("failure case, buffered=%t", buffered), func(t *testing.T) {
// find a port that is not available (this may be timing dependent)
var dialer net.Dialer
Expand Down Expand Up @@ -259,6 +295,7 @@ func TestConnectOnStart(t *testing.T) {
<-s.Ready()

for _, buffered := range []bool{true, false} {
buffered := buffered
t.Run(fmt.Sprintf("normal case, buffered=%t", buffered), func(t *testing.T) {
client, err := fluent.New(
fluent.WithNetwork(s.Network),
Expand Down Expand Up @@ -320,7 +357,7 @@ func TestTagPrefix(t *testing.T) {

<-s.Ready()

client, err := fluent.New(
client, _ := fluent.New(
fluent.WithNetwork(s.Network),
fluent.WithAddress(s.Address),
fluent.WithTagPrefix("test"),
Expand All @@ -329,9 +366,9 @@ func TestTagPrefix(t *testing.T) {
return
}

client.Shutdown(nil)
client.Shutdown(context.TODO())

for _, p := range s.Payload {
for p := range s.PayloadIter() {
if !assert.Equal(t, "test.tag_name", p.Tag, "tag should have prefix") {
return
}
Expand All @@ -345,7 +382,7 @@ func TestBufferFull(t *testing.T) {
}
defer s.Close()

client, err := fluent.New(
client, _ := fluent.New(
fluent.WithNetwork(s.Network),
fluent.WithAddress(s.Address),
fluent.WithBufferLimit(256),
Expand Down Expand Up @@ -406,8 +443,8 @@ func TestBufferFull(t *testing.T) {
t.Errorf("timed out while waiting for the server to process requests")
return // Don't forget to return
case <-tick.C:
if len(s.Payload) == count {
t.Logf("Got expected %d messages in the server", len(s.Payload))
if c := s.PayloadCount(); c == count {
t.Logf("Got expected %d messages in the server", c)
loop = false
}
}
Expand All @@ -418,7 +455,7 @@ func TestBufferFull(t *testing.T) {
pdebug.Printf("Writing one more after draining")
}
// See if we can still write after the buffer has been drained
s.Payload = s.Payload[0:0]
s.TruncatePayload()
if !assert.NoError(t, client.Post("tag_name", map[string]interface{}{"foo": 1}, fluent.WithSyncAppend(true)), "writing after the buffer has been drained should succeed") {
return
}
Expand All @@ -434,8 +471,8 @@ func TestBufferFull(t *testing.T) {
case <-timeout.C:
t.Errorf("timed out while waiting for the server to process requests")
case <-tick.C:
if len(s.Payload) == 1 {
t.Logf("Got expected %d messages in the server", len(s.Payload))
if c := s.PayloadCount(); c == 1 {
t.Logf("Got expected %d messages in the server", c)
loop = false
}
}
Expand All @@ -453,6 +490,7 @@ func (msg *badmsgpack) EncodeMsgpack(_ *msgpack.Encoder) error {

func TestPostSync(t *testing.T) {
for _, syncAppend := range []bool{true, false} {
syncAppend := syncAppend
t.Run("sync="+strconv.FormatBool(syncAppend), func(t *testing.T) {
s, err := newServer(false)
if !assert.NoError(t, err, "newServer should succeed") {
Expand All @@ -462,6 +500,7 @@ func TestPostSync(t *testing.T) {

// This is just to stop the server
sctx, scancel := context.WithCancel(context.Background())
defer scancel()

go s.Run(sctx)

Expand Down Expand Up @@ -503,8 +542,7 @@ func TestPostSync(t *testing.T) {
return
}
}
client.Shutdown(nil)
scancel()
client.Shutdown(context.TODO())
})
}
}
Expand All @@ -524,6 +562,7 @@ func TestPostRoundtrip(t *testing.T) {
}

for _, buffered := range []bool{true, false} {
buffered := buffered
t.Run(fmt.Sprintf("buffered=%t", buffered), func(t *testing.T) {
var options []fluent.Option
if !buffered {
Expand Down Expand Up @@ -555,6 +594,8 @@ func TestPostRoundtrip(t *testing.T) {

// This is just to stop the server
sctx, scancel := context.WithCancel(context.Background())
defer scancel()

go s.Run(sctx)

<-s.Ready()
Expand All @@ -568,15 +609,15 @@ func TestPostRoundtrip(t *testing.T) {
if !assert.NoError(t, err, "failed to create fluent client") {
return
}
defer client.Shutdown(nil)
defer client.Shutdown(ctx)

for _, data := range testcases {
err := client.Post("tag_name", data, fluent.WithTimestamp(time.Unix(1482493046, 0).UTC()))
if !assert.NoError(t, err, "client.Post should succeed") {
return
}
}
client.Shutdown(nil)
client.Shutdown(ctx)

time.Sleep(time.Second)
scancel()
Expand All @@ -591,7 +632,7 @@ func TestPostRoundtrip(t *testing.T) {
case <-s.Done():
}

if !assert.Len(t, s.Payload, len(testcases)) {
if !assert.Equal(t, s.PayloadCount(), len(testcases)) {
return
}

Expand Down Expand Up @@ -619,11 +660,10 @@ func TestPostRoundtrip(t *testing.T) {
}
}

if !assert.Equal(t, &fluent.Message{Tag: "tag_name", Time: fluent.EventTime{Time: time.Unix(1482493046, 0).UTC()}, Record: payload}, s.Payload[i]) {
if !assert.Equal(t, &fluent.Message{Tag: "tag_name", Time: fluent.EventTime{Time: time.Unix(1482493046, 0).UTC()}, Record: payload}, s.PayloadAt(i)) {
return
}
}

})
}
})
Expand All @@ -632,6 +672,7 @@ func TestPostRoundtrip(t *testing.T) {

func TestPing(t *testing.T) {
for _, buffered := range []bool{true, false} {
buffered := buffered
t.Run(fmt.Sprintf("buffered=%t", buffered), func(t *testing.T) {
t.Run("Ping with no server", func(t *testing.T) {
// find a port that is not available (this may be timing dependent)
Expand Down Expand Up @@ -679,7 +720,7 @@ func TestPing(t *testing.T) {

<-s.Ready()

client, err := fluent.New(
client, _ := fluent.New(
fluent.WithNetwork(s.Network),
fluent.WithAddress(s.Address),
fluent.WithBuffered(buffered),
Expand All @@ -690,15 +731,15 @@ func TestPing(t *testing.T) {
return
}

client.Shutdown(nil)
client.Shutdown(context.TODO())

// timing sensitive :/ we need to give the server enough time to receive
// the message before canceling it via scancel
time.Sleep(100*time.Millisecond)
time.Sleep(100 * time.Millisecond)
scancel()
<-s.Done()

if !assert.Len(t, s.Payload, 1, "expected 1 message") {
if !assert.Equal(t, s.PayloadCount(), 1, "expected 1 message") {
return
}
})
Expand Down
1 change: 1 addition & 0 deletions interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Client interface {
Shutdown(context.Context) error
}

//nolint:maligned
// Buffered is a Client that buffers incoming messages, and sends them
// asynchrnously when it can.
type Buffered struct {
Expand Down
8 changes: 6 additions & 2 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,16 @@ func (m *Message) MarshalJSON() ([]byte, error) {

buf.WriteByte('[')

enc.Encode(m.Tag)
if err := enc.Encode(m.Tag); err != nil {
return nil, errors.Wrap(err, `failed to encode tag`)
}
buf.Truncate(buf.Len() - 1)

buf.WriteByte(',')

enc.Encode(m.Time.Unix())
if err := enc.Encode(m.Time.Unix()); err != nil {
return nil, errors.Wrap(err, `failed to encode time`)
}
buf.Truncate(buf.Len() - 1)

buf.WriteByte(',')
Expand Down
Loading

0 comments on commit 171171d

Please sign in to comment.