Skip to content

Commit e94c116

Browse files
committed
ringbuf: add zero-copy consumer APIs
The current ringbuf consumer APIs Read/ReadInto always copy samples into a user-provided buffer, see https://github.com/cilium/ebpf/blob/v0.20.0/ringbuf/ring.go#L96. For callers that consume data synchronizely, this extra copy is unnecessary. This change adds a zero-copy consumer API: - Peek / PeekInto – return a view into the mmap’d ring buffer without advancing the consumer position. - Consume – advance the consumer position once the view has been processed. Existing Read/ReadInto semantics are unchanged and continue to work as before. A preliminary microbenchmark [1] shows zero-copy advantage grows with larger records because copy throughput falls while view throughput stays roughly flat. Single-core run on CPU 2, ring size 1 GiB, Go 1.24.4. Throughput is in million events/second (Mev/s); "speedup" is zero-copy / copy. | event-size (B) | events/run | copy (Mev/s) | zero-copy (Mev/s) | speedup | | --- | --- | --- | --- | --- | | 128 | 7,895,160 | 45.63 | 49.83 | 1.09x | | 512 | 2,064,888 | 25.03 | 34.94 | 1.40x | | 1024 | 1,040,447 | 8.90 | 34.94 | 3.93x | | 2048 | 522,247 | 4.57 | 29.56 | 6.47x | [1] https://github.com/jschwinger233/bpf_ringbuf_zc_benchmark Signed-off-by: graymon <[email protected]>
1 parent 3d4ca80 commit e94c116

File tree

3 files changed

+130
-16
lines changed

3 files changed

+130
-16
lines changed

ringbuf/reader.go

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"os"
77
"sync"
8+
"sync/atomic"
89
"time"
910
"unsafe"
1011

@@ -46,6 +47,14 @@ type Record struct {
4647
Remaining int
4748
}
4849

50+
// View is a zero-copy record returned by Peek/PeekInto until Consume is called.
51+
type View struct {
52+
Sample []byte
53+
Remaining int
54+
55+
nextCons uintptr
56+
}
57+
4958
// Reader allows reading bpf_ringbuf_output
5059
// from user space.
5160
type Reader struct {
@@ -144,6 +153,32 @@ func (r *Reader) Read() (Record, error) {
144153

145154
// ReadInto is like Read except that it allows reusing Record and associated buffers.
146155
func (r *Reader) ReadInto(rec *Record) error {
156+
return r.readLocked(func() error {
157+
return r.ring.readRecord(rec)
158+
})
159+
}
160+
161+
// Peek returns the next sample without advancing the consumer position.
162+
func (r *Reader) Peek() (View, error) {
163+
var v View
164+
err := r.PeekInto(&v)
165+
return v, err
166+
}
167+
168+
// PeekInto is like Peek but stores the sample in view for reuse.
169+
func (r *Reader) PeekInto(view *View) error {
170+
return r.readLocked(func() error {
171+
return r.ring.readView(view)
172+
})
173+
}
174+
175+
// Consume advances the consumer position after a successful Peek/PeekInto.
176+
func (r *Reader) Consume(view *View) {
177+
atomic.StoreUintptr(r.ring.cons_pos, view.nextCons)
178+
}
179+
180+
// readLocked drives the polling / data-availability loop shared by Record and View reads.
181+
func (r *Reader) readLocked(read func() error) error {
147182
r.mu.Lock()
148183
defer r.mu.Unlock()
149184

@@ -171,7 +206,7 @@ func (r *Reader) ReadInto(rec *Record) error {
171206
}
172207

173208
for {
174-
err := r.ring.readRecord(rec)
209+
err := read()
175210
// Not using errors.Is which is quite a bit slower
176211
// For a tight loop it might make a difference
177212
if err == errBusy {

ringbuf/reader_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,3 +384,56 @@ func BenchmarkReadInto(b *testing.B) {
384384
}
385385
}
386386
}
387+
388+
func TestPeek(t *testing.T) {
389+
testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer")
390+
391+
prog, events := mustOutputSamplesProg(t,
392+
sampleMessage{size: 5, flags: 0},
393+
sampleMessage{size: 7, flags: 0},
394+
)
395+
mustRun(t, prog)
396+
397+
rd, err := NewReader(events)
398+
if err != nil {
399+
t.Fatal(err)
400+
}
401+
defer rd.Close()
402+
403+
view1, err := rd.Peek()
404+
qt.Assert(t, qt.IsNil(err))
405+
qt.Assert(t, qt.Equals(len(view1.Sample), 5))
406+
rd.Consume(&view1)
407+
408+
view2, err := rd.Peek()
409+
qt.Assert(t, qt.IsNil(err))
410+
qt.Assert(t, qt.Equals(len(view2.Sample), 7))
411+
rd.Consume(&view2)
412+
}
413+
414+
func TestPeekInto(t *testing.T) {
415+
testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer")
416+
417+
prog, events := mustOutputSamplesProg(t,
418+
sampleMessage{size: 5, flags: 0},
419+
sampleMessage{size: 7, flags: 0},
420+
)
421+
mustRun(t, prog)
422+
423+
rd, err := NewReader(events)
424+
if err != nil {
425+
t.Fatal(err)
426+
}
427+
defer rd.Close()
428+
429+
var view View
430+
err = rd.PeekInto(&view)
431+
qt.Assert(t, qt.IsNil(err))
432+
qt.Assert(t, qt.Equals(len(view.Sample), 5))
433+
rd.Consume(&view)
434+
435+
err = rd.PeekInto(&view)
436+
qt.Assert(t, qt.IsNil(err))
437+
qt.Assert(t, qt.Equals(len(view.Sample), 7))
438+
rd.Consume(&view)
439+
}

ringbuf/ring.go

Lines changed: 41 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,17 @@ func (rr *ringReader) AvailableBytes() uint64 {
4141
return uint64(prod - cons)
4242
}
4343

44-
// Read a record from an event ring.
45-
func (rr *ringReader) readRecord(rec *Record) error {
44+
// readSample returns a zero-copy view into the next sample, together with the
45+
// consumer position that should be stored to release the data.
46+
func (rr *ringReader) readSample() (sample []byte, remaining int, nextCons uintptr, err error) {
4647
prod := atomic.LoadUintptr(rr.prod_pos)
4748
cons := atomic.LoadUintptr(rr.cons_pos)
4849

4950
for {
5051
if remaining := prod - cons; remaining == 0 {
51-
return errEOR
52+
return nil, 0, 0, errEOR
5253
} else if remaining < sys.BPF_RINGBUF_HDR_SZ {
53-
return fmt.Errorf("read record header: %w", io.ErrUnexpectedEOF)
54+
return nil, 0, 0, fmt.Errorf("read record header: %w", io.ErrUnexpectedEOF)
5455
}
5556

5657
// read the len field of the header atomically to ensure a happens before
@@ -65,15 +66,15 @@ func (rr *ringReader) readRecord(rec *Record) error {
6566
// the next sample in the ring is not committed yet so we
6667
// exit without storing the reader/consumer position
6768
// and start again from the same position.
68-
return errBusy
69+
return nil, 0, 0, errBusy
6970
}
7071

7172
cons += sys.BPF_RINGBUF_HDR_SZ
7273

7374
// Data is always padded to 8 byte alignment.
7475
dataLenAligned := uintptr(internal.Align(header.dataLen(), 8))
7576
if remaining := prod - cons; remaining < dataLenAligned {
76-
return fmt.Errorf("read sample data: %w", io.ErrUnexpectedEOF)
77+
return nil, 0, 0, fmt.Errorf("read sample data: %w", io.ErrUnexpectedEOF)
7778
}
7879

7980
start = cons & rr.mask
@@ -87,15 +88,40 @@ func (rr *ringReader) readRecord(rec *Record) error {
8788
continue
8889
}
8990

90-
if n := header.dataLen(); cap(rec.RawSample) < n {
91-
rec.RawSample = make([]byte, n)
92-
} else {
93-
rec.RawSample = rec.RawSample[:n]
94-
}
91+
end := int(start) + header.dataLen()
92+
return rr.ring[start:end], int(prod - cons), cons, nil
93+
}
94+
}
95+
96+
// Read a record from an event ring.
97+
func (rr *ringReader) readRecord(rec *Record) error {
98+
sample, remaining, nextCons, err := rr.readSample()
99+
if err != nil {
100+
return err
101+
}
95102

96-
copy(rec.RawSample, rr.ring[start:])
97-
rec.Remaining = int(prod - cons)
98-
atomic.StoreUintptr(rr.cons_pos, cons)
99-
return nil
103+
if n := len(sample); cap(rec.RawSample) < n {
104+
rec.RawSample = make([]byte, n)
105+
} else {
106+
rec.RawSample = rec.RawSample[:n]
100107
}
108+
109+
copy(rec.RawSample, sample)
110+
rec.Remaining = remaining
111+
atomic.StoreUintptr(rr.cons_pos, nextCons)
112+
return nil
113+
}
114+
115+
// readView is the zero-copy variant of readRecord. It leaves advancing the
116+
// consumer position to the caller.
117+
func (rr *ringReader) readView(view *View) error {
118+
sample, remaining, nextCons, err := rr.readSample()
119+
if err != nil {
120+
return err
121+
}
122+
123+
view.Sample = sample
124+
view.Remaining = remaining
125+
view.nextCons = nextCons
126+
return nil
101127
}

0 commit comments

Comments
 (0)