Skip to content

Fixed #428 by making reader.read respect context closure #429

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 29 additions & 6 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1337,25 +1337,48 @@ func (r *reader) initialize(ctx context.Context, offset int64) (conn *Conn, star
return
}

func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, error) {
r.stats.fetches.observe(1)
r.stats.offset.observe(offset)
// readBatch wraps the call to conn.ReadBatchWith to make it interruptible.
// Conn methods are written in a non-interruptible style, so the only way to
// interrupt them is to close the connection in another goroutine.
func (r *reader) readBatch(ctx context.Context, conn *Conn) (*Batch, error) {
done := make(chan struct{})
defer close(done)

t0 := time.Now()
conn.SetReadDeadline(t0.Add(r.maxWait))
go func() {
select {
case <-ctx.Done():
conn.Close()
case <-done:
return
}
}()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would need to wait for this goroutine to finish before returning from the readBatch function, otherwise the goroutine will execute asynchronously and may close the connection when ctx is canceled after the function returned.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerned that having this goroutine spawn for every batch read by the consumer may greatly increase pressure on the Go scheduler and GC, which could impact performance of high throughput consumers.

Could we either add a benchmark to evaluate the impact of this change?


batch := conn.ReadBatchWith(ReadBatchConfig{
MinBytes: r.minBytes,
MaxBytes: r.maxBytes,
IsolationLevel: r.isolationLevel,
})
return batch, ctx.Err()
}

func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, error) {
r.stats.fetches.observe(1)
r.stats.offset.observe(offset)

t0 := time.Now()
conn.SetReadDeadline(t0.Add(r.maxWait))

batch, err := r.readBatch(ctx, conn)
if err != nil {
return offset, err
}

highWaterMark := batch.HighWaterMark()

t1 := time.Now()
r.stats.waitTime.observeDuration(t1.Sub(t0))

var msg Message
var err error
var size int64
var bytes int64

Expand Down
25 changes: 25 additions & 0 deletions reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1313,3 +1313,28 @@ func getOffsets(t *testing.T, config ReaderConfig) offsetFetchResponseV1 {

return offsets
}

func TestReaderClose(t *testing.T) {
t.Parallel()

r := NewReader(ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: makeTopic(),
MaxWait: 2 * time.Second,
})
defer r.Close()

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

_, err := r.FetchMessage(ctx)
if err != context.DeadlineExceeded {
t.Errorf("bad err: %v", err)
}

t0 := time.Now()
r.Close()
if time.Since(t0) > 100*time.Millisecond {
t.Errorf("r.Close took too long")
}
}