Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions connsocket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// SPDX-FileCopyrightText: 2025 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package transport

import (
"net"
"time"
)

type NetConnSocket interface {
net.Conn

ReadWithAttributes(p []byte, attr *PacketAttributes) (n int, err error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better if we unify attribute types in the APIs.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think having a generic packet attributes type with variable buffer length would address this comment right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah a generic packet attributes will be cool, good idea.

}

type PacketConnSocket interface {
net.PacketConn

ReadFromWithAttributes(p []byte, attr *PacketAttributes) (n int, addr net.Addr, err error)
}

// NetConnToNetConnSocket wraps a net.Conn and implements the PacketStream interface by delegating
// calls to the underlying connection. ReadWithAttributes delegates to Read and
// ignores the provided PacketAttributes.
type NetConnToNetConnSocket struct {
conn net.Conn
}

// NewNetConnToNetConnSocket returns a new Proxy that wraps the provided net.Conn.
func NewNetConnToNetConnSocket(conn net.Conn) *NetConnToNetConnSocket {
return &NetConnToNetConnSocket{conn: conn}
}

// ReadWithAttributes reads from the underlying connection and ignores attributes.
func (p *NetConnToNetConnSocket) ReadWithAttributes(b []byte, _ *PacketAttributes) (int, error) {
return p.conn.Read(b)
}

// Delegate net.Conn methods to the underlying connection.

Check failure on line 40 in connsocket.go

View workflow job for this annotation

GitHub Actions / lint / Go

ST1020: comment on exported method Read should be of the form "Read ..." (staticcheck)
func (p *NetConnToNetConnSocket) Read(b []byte) (int, error) { return p.conn.Read(b) }
func (p *NetConnToNetConnSocket) Write(b []byte) (int, error) { return p.conn.Write(b) }
func (p *NetConnToNetConnSocket) Close() error { return p.conn.Close() }
func (p *NetConnToNetConnSocket) LocalAddr() net.Addr { return p.conn.LocalAddr() }
func (p *NetConnToNetConnSocket) RemoteAddr() net.Addr { return p.conn.RemoteAddr() }
func (p *NetConnToNetConnSocket) SetDeadline(t time.Time) error { return p.conn.SetDeadline(t) }
func (p *NetConnToNetConnSocket) SetReadDeadline(t time.Time) error { return p.conn.SetReadDeadline(t) }
func (p *NetConnToNetConnSocket) SetWriteDeadline(t time.Time) error {
return p.conn.SetWriteDeadline(t)
}
42 changes: 42 additions & 0 deletions packetattributes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// SPDX-FileCopyrightText: 2025 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package transport

const MaxAttributesLen = 1024

type PacketAttributes struct {
Buffer []byte
BytesCopied int
}

func (p *PacketAttributes) Reset() {
p.BytesCopied = 0
}

func NewPacketAttributesWithLen(length int) *PacketAttributes {
buff := make([]byte, length)

return &PacketAttributes{
Buffer: buff,
BytesCopied: 0,
}
}

func (p *PacketAttributes) Clone() *PacketAttributes {
b := make([]byte, p.BytesCopied)
copy(b, p.Buffer)
return &PacketAttributes{

Check failure on line 29 in packetattributes.go

View workflow job for this annotation

GitHub Actions / lint / Go

return with no blank line before (nlreturn)
Buffer: b,
BytesCopied: p.BytesCopied,
}
}

// Returns the read buffer. Just like when calling a read on a socket we have n, err := conn.Read(buf)

Check failure on line 35 in packetattributes.go

View workflow job for this annotation

GitHub Actions / lint / Go

ST1020: comment on exported method GetReadPacketAttributes should be of the form "GetReadPacketAttributes ..." (staticcheck)
// and the read bytes are in buf[:n], we should use this method after calling the ReadWithAttributes method.
func (p *PacketAttributes) GetReadPacketAttributes() *PacketAttributes {
return &PacketAttributes{
Buffer: p.Buffer[:p.BytesCopied],
BytesCopied: p.BytesCopied,
}
}
191 changes: 144 additions & 47 deletions packetio/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
"sync"
"time"

"github.com/pion/transport/v3"
"github.com/pion/transport/v3/deadline"
)

var errPacketTooBig = errors.New("packet too big")

Check failure on line 17 in packetio/buffer.go

View workflow job for this annotation

GitHub Actions / lint / Go

File is not properly formatted (gofumpt)
var errPacketAttributesBufferTooShort = errors.New("packet attributes buffer too short")

// BufferPacketType allow the Buffer to know which packet protocol is writing.
type BufferPacketType int
Expand Down Expand Up @@ -61,15 +63,16 @@
}
}

// available returns true if the buffer is large enough to fit a packet
// of the given size, taking overhead into account.
// available returns true if the buffer is large enough to fit a buffer
// of the given size, not taking overhead into account.
func (b *Buffer) available(size int) bool {
available := b.head - b.tail
if available <= 0 {
available += len(b.data)
}
// we interpret head=tail as empty, so always keep a byte free
if size+2+1 > available {
// the method
if size+1 > available {
return false
}

Expand Down Expand Up @@ -119,12 +122,70 @@
return nil
}

func (b *Buffer) consumeByte() byte {
byteRead := b.data[b.head]
b.head++
if b.head >= len(b.data) {
b.head = 0
}

return byteRead
}

func (b *Buffer) writeByte(x byte) {
b.data[b.tail] = x
b.tail++
if b.tail >= len(b.data) {
b.tail = 0
}
}

func (b *Buffer) writeLengthHeaders(length int) {
n1 := uint8(length >> 8) //nolint:gosec
n2 := uint8(length) //nolint:gosec
b.writeByte(n1)
b.writeByte(n2)
}

func (b *Buffer) getSegmentLength() int {
n1 := b.consumeByte()
n2 := b.consumeByte()
l := int((uint16(n1) << 8) | uint16(n2))

return l
}

// The caller should make sure the input buffer (in []byte)
// can be completely accomodated in b.data .

Check failure on line 159 in packetio/buffer.go

View workflow job for this annotation

GitHub Actions / lint / Go

`accomodated` is a misspelling of `accommodated` (misspell)
func (b *Buffer) writeFromInputBuffer(in []byte) {
n := copy(b.data[b.tail:], in)
b.tail += n
if b.tail >= len(b.data) {
// we reached the end, wrap around
m := copy(b.data, in[n:])
b.tail = m
}
}

// Write appends a copy of the packet data to the buffer.
// Returns ErrFull if the packet doesn't fit.
//
// Note that the packet size is limited to 65536 bytes since v0.11.0 due to the internal data structure.
func (b *Buffer) Write(packet []byte) (int, error) { //nolint:cyclop
if len(packet) >= 0x10000 {
func (b *Buffer) Write(buff []byte) (n int, err error) { //nolint:cyclop
return b.WriteWithAttributes(buff, nil)
}
Comment on lines +174 to +176
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we shouldn't change the normal write path, this is a behavior change, and it will break applications that depend on this logic.

Copy link
Author

@penhauer penhauer Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Joe, I will address the comments one by one. Thank you for your great feedback.

No it will not break it. Whether you write the a packet with attributes or no attributes (nil in this case), an additional byte segment will be stored in the buffer. The buffer saves the packet length as two bytes first, and then the packet itself. With my changes, every written packet will have two segments. The first segment is like before, so two bytes for length and the rest for the packet. The second segment stores the attributes, first the attributes length (two bytes for that) and the attributes payload. When reading from buffer, regardless of method, both segments are consumed from the buffer (the packet and the payload itself). When writing into the buffer, regardless of having attributes present or not, a segment of size 2+ (2 for the segment length) will be stored in the buffer. Now if the attribtues are nil, only the empty segment's length (=2) will be stored.

Also, If it were to break anything, I couldn't have my sender and receiver working. However, they work fine and all the ICE candidate exchange and RTP/RTCP exchange work properly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're not the only users of pion/transport, even if receive and send works for us. If a user writes 2 bytes using Write and now it writes 4 bytes. this is a breaking behavior, even if the API itself didn't change.
We can add like a private flag or something hasAttributes.

Copy link
Author

@penhauer penhauer Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No you're not on the right track. What does a user of PacketBuffer see? They are concerned with writing a packet properly to the buffer, and reading one packet properly from the buffer. The internals of the buffer are not the user's business.

In your case, we write 2 bytes (2 bytes only for the payload of the packet) to the buffer but:

  • We return 2 to the user (only the length of payload's bytes written to the packet buffer) so the user does not even understand more bytes are stored actually
  • When we read from the buffer, we consume all the 4 bytes written and still tel the user we read 2 bytes so no behavior change here either.

P.S. Is your concern the method Size? I didn't pay attention, yes the behavior of Size shouldn't change you're right. I thought it's internal to the buffer. I will fix this.

Copy link
Member

@JoTurk JoTurk Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not correct, because in packetio.Buffer the internal byte count directly affects observable behavior through:

  1. Size()
  2. SetLimitSize()
  3. SetLimitCount()
  4. backpressure via Write() returning ErrFull when the buffer is full, now we have extra bytes.
  5. how grow() decides the allocation size
  6. how many packets fit before being dropped/rejected

So even if Write() returns the same n and Read() returns the same n, the extra internal bytes materially change the buffer's behavior, it's not just about size.

Copy link
Author

@penhauer penhauer Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I see your point. Thanks for your patience. Yes you're right. I checked the methods that call Size() in other repos. It seems like they are not using it so it's almost internal method. For SetLimitSize(), it seems like the callers are not that exact in the set size as they set the size to 100*1000 for example, so the additional 2 bytes practically change nothing in the behavior. Now If you say we should maintain the exact exact same behavior I will do this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, and again i'm sorry, introducing / changing APIs is always hard, and pion/transport touches every library we have.


// WriteWithAttributes works like Write, but it writes the
// additional packet attributes into the Buffer.
func (b *Buffer) WriteWithAttributes(packet []byte,

Check failure on line 180 in packetio/buffer.go

View workflow job for this annotation

GitHub Actions / lint / Go

calculated cyclomatic complexity for function WriteWithAttributes is 13, max is 10 (cyclop)
attr *transport.PacketAttributes) (int, error) { //nolint:cyclop
pLen := len(packet)
aLen := 0
if attr != nil {
aLen = len(attr.Buffer)
}

if pLen >= 0x10000 {
return 0, errPacketTooBig
}

Expand All @@ -136,15 +197,17 @@
return 0, io.ErrClosedPipe
}

// two header bytes per segment indicating the length of the stored segment
lenWithHeaders := (2 + pLen) + (2 + aLen)
if (b.limitCount > 0 && b.count >= b.limitCount) ||
(b.limitSize > 0 && b.size()+2+len(packet) > b.limitSize) {
(b.limitSize > 0 && b.size()+lenWithHeaders > b.limitSize) {
b.mutex.Unlock()

return 0, ErrFull
}

// grow the buffer until the packet fits
for !b.available(len(packet)) {
for !b.available(lenWithHeaders) {
err := b.grow()
if err != nil {
b.mutex.Unlock()
Expand All @@ -154,41 +217,61 @@
}

// store the length of the packet
b.data[b.tail] = uint8(len(packet) >> 8) //nolint:gosec
b.tail++
if b.tail >= len(b.data) {
b.tail = 0
}
b.data[b.tail] = uint8(len(packet)) //nolint:gosec
b.tail++
if b.tail >= len(b.data) {
b.tail = 0
}
b.writeLengthHeaders(pLen)

// store the packet
n := copy(b.data[b.tail:], packet)
b.tail += n
if b.tail >= len(b.data) {
// we reached the end, wrap around
m := copy(b.data, packet[n:])
b.tail = m
}
b.writeFromInputBuffer(packet)

// increment the number of packets in buffer
b.count++

// store the length of the attributes segment
b.writeLengthHeaders(aLen)

if aLen > 0 {
// store the attributes buffer itself
b.writeFromInputBuffer(attr.Buffer)
}

select {
case b.notify <- struct{}{}:
default:
}
b.mutex.Unlock()

return len(packet), nil
return pLen, nil
}

func (b *Buffer) writeToInputBuffer(in []byte, length int) {
if b.head+length < len(b.data) {
copy(in, b.data[b.head:b.head+length])
} else {
k := copy(in, b.data[b.head:])
copy(in[k:], b.data[:length-k])
}
}

func (b *Buffer) advanceHead(count int) {
b.head += count
if b.head >= len(b.data) {
b.head -= len(b.data)
}
}

// Read populates the given byte slice, returning the number of bytes read.
// Blocks until data is available or the buffer is closed.
// Returns io.ErrShortBuffer is the packet is too small to copy the Write.
// Returns io.EOF if the buffer is closed.
func (b *Buffer) Read(packet []byte) (n int, err error) { //nolint:gocognit,cyclop
func (b *Buffer) Read(buff []byte) (n int, err error) { //nolint:gocognit,cyclop
n, err = b.ReadWithAttributes(buff, nil)

return
}

// ReadWithAttributes works like Read, but it also populates
// additional packet attributes into the attr field.
func (b *Buffer) ReadWithAttributes(

Check failure on line 273 in packetio/buffer.go

View workflow job for this annotation

GitHub Actions / lint / Go

calculated cyclomatic complexity for function ReadWithAttributes is 13, max is 10 (cyclop)
packet []byte, attr *transport.PacketAttributes) (n int, err error) { //nolint:gocognit,cyclop
// Return immediately if the deadline is already exceeded.
select {
case <-b.readDeadline.Done():
Expand All @@ -201,17 +284,7 @@

if b.head != b.tail { //nolint:nestif
// decode the packet size
n1 := b.data[b.head]
b.head++
if b.head >= len(b.data) {
b.head = 0
}
n2 := b.data[b.head]
b.head++
if b.head >= len(b.data) {
b.head = 0
}
count := int((uint16(n1) << 8) | uint16(n2))
count := b.getSegmentLength()

// determine the number of bytes we'll actually copy
copied := count
Expand All @@ -220,18 +293,13 @@
}

// copy the data
if b.head+copied < len(b.data) {
copy(packet, b.data[b.head:b.head+copied])
} else {
k := copy(packet, b.data[b.head:])
copy(packet[k:], b.data[:copied-k])
}
b.writeToInputBuffer(packet, copied)

// advance head, discarding any data that wasn't copied
b.head += count
if b.head >= len(b.data) {
b.head -= len(b.data)
}
b.advanceHead(count)

// read the attributes segment
attrErr := b.readPacketAttributes(attr)

if b.head == b.tail {
// the buffer is empty, reset to beginning
Expand All @@ -244,9 +312,17 @@
b.mutex.Unlock()

if copied < count {
// The method still consumes the buffer even in the cases where
// packet buffer is short. but even in this case copies into the packet buffer
// and discards it.
return copied, io.ErrShortBuffer
}

// only if attr != nil do we care about attributes' buffer
if attr != nil && attrErr != nil {
return copied, attrErr
}

return copied, nil
}

Expand All @@ -265,6 +341,27 @@
}
}

func (b *Buffer) readPacketAttributes(attr *transport.PacketAttributes) error {
aLen := b.getSegmentLength()
if aLen == 0 {
return nil
}
if attr == nil {
b.advanceHead(aLen)
return nil

Check failure on line 351 in packetio/buffer.go

View workflow job for this annotation

GitHub Actions / lint / Go

return with no blank line before (nlreturn)
}
aBuffer := attr.Buffer
b.writeToInputBuffer(aBuffer, aLen)
b.advanceHead(aLen)
if len(aBuffer) >= aLen {
attr.BytesCopied = aLen
return nil
}

attr.BytesCopied = len(aBuffer)
return errPacketAttributesBufferTooShort

Check failure on line 362 in packetio/buffer.go

View workflow job for this annotation

GitHub Actions / lint / Go

return with no blank line before (nlreturn)
}

// Close the buffer, unblocking any pending reads.
// Data in the buffer can still be read, Read will return io.EOF only when empty.
func (b *Buffer) Close() (err error) {
Expand Down
Loading
Loading