Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 51 additions & 39 deletions association.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@
// Congestion control parameters
maxReceiveBufferSize uint32
maxMessageSize uint32
rtoMax float64
cwnd uint32 // my congestion window size
rwnd uint32 // calculated peer's receiver windows size
ssthresh uint32 // slow start threshold
Expand All @@ -248,9 +249,9 @@
ackTimer *ackTimer

// RACK / TLP state
rack rackSettings // rack configurable options
rackReoWnd time.Duration // dynamic reordering window
rackMinRTT time.Duration // min observed RTT
rackMinRTTWnd *windowedMin // the window used to determine minRTT, defaults to 30s
rackDeliveredTime time.Time // send time of most recently delivered original chunk
rackHighestDeliveredOrigTSN uint32
rackReorderingSeen bool // ever observed reordering for this association
Expand All @@ -266,9 +267,6 @@
rackDeadline time.Time
ptoDeadline time.Time

rackWCDelAck time.Duration // 200ms default
rackReoWndFloor time.Duration

// Chunks stored for retransmission
storedInit *chunkInit
storedCookieEcho *chunkCookieEcho
Expand Down Expand Up @@ -318,16 +316,17 @@
// Config collects the arguments to createAssociation construction into
// a single structure.
type Config struct {
Name string
NetConn net.Conn
MaxReceiveBufferSize uint32
MaxMessageSize uint32
EnableZeroChecksum bool
LoggerFactory logging.LoggerFactory
BlockWrite bool
MTU uint32
LoggerFactory logging.LoggerFactory
Name string
NetConn net.Conn

BlockWrite bool
EnableZeroChecksum bool
MTU uint32

// congestion control configuration
MaxReceiveBufferSize uint32
MaxMessageSize uint32
// RTOMax is the maximum retransmission timeout in milliseconds
RTOMax float64
// Minimum congestion window
Expand All @@ -337,14 +336,8 @@
// Step of congestion window increase at Congestion Avoidance
CwndCAStep uint32

// The RACK configs are currently private as SCTP will be reworked to use the
// modern options pattern in a future release.
// Optional: size of window used to determine minimum RTT for RACK (defaults to 30s)
rackMinRTTWnd time.Duration
// Optional: cap the minimum reordering window: 0 = use quarter-RTT
rackReoWndFloor time.Duration
// Optional: receiver worst-case delayed-ACK for PTO when only one packet is in flight
rackWCDelAck time.Duration
// RACK config options
rack rackSettings
}

// Server accepts a SCTP stream over a conn.
Expand Down Expand Up @@ -390,6 +383,21 @@
}
}

// newAssociationWithOptions finalizes a pre-configured association with optional overrides.
//
//nolint:gocognit,cyclop
func newAssociationWithOptions(assoc *Association, opts ...AssociationOption) (*Association, error) {

Check failure on line 389 in association.go

View workflow job for this annotation

GitHub Actions / lint / Go

func newAssociationWithOptions is unused (unused)
var err error

for _, opt := range opts {
if err = opt(assoc); err != nil {
return nil, err
}
}

return assoc, nil
}

func createAssociation(config Config) *Association {
maxReceiveBufferSize := config.MaxReceiveBufferSize
if maxReceiveBufferSize == 0 {
Expand All @@ -406,6 +414,8 @@
mtu = initialMTU
}

rtoMax := config.RTOMax

tsn := globalMathRandomGenerator.Uint32()
assoc := &Association{
netConn: config.NetConn,
Expand Down Expand Up @@ -459,18 +469,20 @@
assoc.tlrBurstLaterRTTUnits = tlrBurstDefaultLaterRTT

// RACK defaults
assoc.rackWCDelAck = config.rackWCDelAck
if assoc.rackWCDelAck == 0 {
assoc.rackWCDelAck = 200 * time.Millisecond // WCDelAckT, RACK for SCTP section 2C
assoc.rack.rackWCDelAck = config.rack.rackWCDelAck
if assoc.rack.rackWCDelAck == 0 {
assoc.rack.rackWCDelAck = 200 * time.Millisecond // WCDelAckT, RACK for SCTP section 2C
}

// defaults to 30s window to determine minRTT
assoc.rackMinRTTWnd = newWindowedMin(config.rackMinRTTWnd)
assoc.rack.rackMinRTTWnd = config.rack.rackMinRTTWnd
if assoc.rack.rackMinRTTWnd == nil {
assoc.rack.rackMinRTTWnd = newWindowedMin(30 * time.Second)
}

assoc.timerUpdateCh = make(chan struct{}, 1)
go assoc.timerLoop()

assoc.rackReoWndFloor = config.rackReoWndFloor // optional floor; usually 0
assoc.rack.rackReoWndFloor = config.rack.rackReoWndFloor // optional floor; usually 0
assoc.rackKeepInflatedRecoveries = 0

if assoc.name == "" {
Expand All @@ -486,11 +498,11 @@
assoc.name, assoc.CWND(), assoc.ssthresh, assoc.inflightQueue.getNumBytes())

assoc.srtt.Store(float64(0))
assoc.t1Init = newRTXTimer(timerT1Init, assoc, maxInitRetrans, config.RTOMax)
assoc.t1Cookie = newRTXTimer(timerT1Cookie, assoc, maxInitRetrans, config.RTOMax)
assoc.t2Shutdown = newRTXTimer(timerT2Shutdown, assoc, noMaxRetrans, config.RTOMax)
assoc.t3RTX = newRTXTimer(timerT3RTX, assoc, noMaxRetrans, config.RTOMax)
assoc.tReconfig = newRTXTimer(timerReconfig, assoc, noMaxRetrans, config.RTOMax)
assoc.t1Init = newRTXTimer(timerT1Init, assoc, maxInitRetrans, rtoMax)
assoc.t1Cookie = newRTXTimer(timerT1Cookie, assoc, maxInitRetrans, rtoMax)
assoc.t2Shutdown = newRTXTimer(timerT2Shutdown, assoc, noMaxRetrans, rtoMax)
assoc.t3RTX = newRTXTimer(timerT3RTX, assoc, noMaxRetrans, rtoMax)
assoc.tReconfig = newRTXTimer(timerReconfig, assoc, noMaxRetrans, rtoMax)
assoc.ackTimer = newAckTimer(assoc)

return assoc
Expand Down Expand Up @@ -1617,7 +1629,7 @@
srtt := a.rtoMgr.setNewRTT(rttMs)
a.srtt.Store(srtt)

a.rackMinRTTWnd.Push(now, now.Sub(sent))
a.rack.rackMinRTTWnd.Push(now, now.Sub(sent))

a.log.Tracef("[%s] HB RTT: measured=%.3fms srtt=%.3fms rto=%.3fms",
a.name, rttMs, srtt, a.rtoMgr.getRTO())
Expand Down Expand Up @@ -1955,7 +1967,7 @@
// use a window to determine minRtt instead of a global min
// as the RTT can fluctuate, which can cause problems if going from a
// high RTT to a low RTT.
a.rackMinRTTWnd.Push(now, now.Sub(chunkPayload.since))
a.rack.rackMinRTTWnd.Push(now, now.Sub(chunkPayload.since))

a.log.Tracef("[%s] SACK: measured-rtt=%f srtt=%f new-rto=%f",
a.name, rtt, srtt, a.rtoMgr.getRTO())
Expand Down Expand Up @@ -2012,7 +2024,7 @@
srtt := a.rtoMgr.setNewRTT(rtt)
a.srtt.Store(srtt)

a.rackMinRTTWnd.Push(now, now.Sub(chunkPayload.since))
a.rack.rackMinRTTWnd.Push(now, now.Sub(chunkPayload.since))

a.log.Tracef("[%s] SACK: measured-rtt=%f srtt=%f new-rto=%f",
a.name, rtt, srtt, a.rtoMgr.getRTO())
Expand Down Expand Up @@ -3491,13 +3503,13 @@
}

// 2) Maintain ReoWND (RACK for SCTP section 2B)
if minRTT := a.rackMinRTTWnd.Min(currTime); minRTT > 0 {
if minRTT := a.rack.rackMinRTTWnd.Min(currTime); minRTT > 0 {
a.rackMinRTT = minRTT
}

var base time.Duration
if a.rackMinRTT > 0 {
base = max(a.rackMinRTT/4, a.rackReoWndFloor)
base = max(a.rackMinRTT/4, a.rack.rackReoWndFloor)
}

// Suppress during recovery if no reordering ever seen; else (re)initialize from base if zero.
Expand All @@ -3510,7 +3522,7 @@
// DSACK-style inflation using SCTP duplicate TSNs (RACK for SCTP section 3 noting SCTP
// natively reports duplicates + RACK for SCTP section 2B policy)
if len(sack.duplicateTSN) > 0 && a.rackMinRTT > 0 {
a.rackReoWnd += max(a.rackMinRTT/4, a.rackReoWndFloor)
a.rackReoWnd += max(a.rackMinRTT/4, a.rack.rackReoWndFloor)
// keep inflated for 16 loss recoveries before reset
a.rackKeepInflatedRecoveries = 16
a.log.Tracef("[%s] RACK: DSACK/dupTSN seen, inflate reoWnd to %v", a.name, a.rackReoWnd)
Expand Down Expand Up @@ -3608,7 +3620,7 @@
extra := 2 * time.Millisecond

if a.inflightQueue.size() == 1 {
extra = a.rackWCDelAck // 200ms for single outstanding, else 2ms
extra = a.rack.rackWCDelAck // 200ms for single outstanding, else 2ms
}

pto = 2*srtt + extra
Expand All @@ -3634,7 +3646,7 @@
extra := 2 * time.Millisecond

if a.inflightQueue.size() == 1 {
extra = a.rackWCDelAck
extra = a.rack.rackWCDelAck
}

pto = 2*srtt + extra
Expand Down
140 changes: 140 additions & 0 deletions association_options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package sctp

import (
"net"

"github.com/pion/logging"
)

// AssociationOption represents a function that can be used to configure an Association.
type AssociationOption func(*Association) error

// WithLoggerFactory sets the logger factory for the association.
func WithLoggerFactory(loggerFactory logging.LoggerFactory) AssociationOption {
return func(a *Association) error {
a.log = loggerFactory.NewLogger("sctp")

return nil
}
}

// WithName sets the name of the association.
func WithName(name string) AssociationOption {
return func(a *Association) error {
a.name = name

return nil
}
}

// WithNetConn sets the net.Conn used by the association.
func WithNetConn(net net.Conn) AssociationOption {
return func(a *Association) error {
a.netConn = net

return nil
}
}

// WithBlockWrite sets whether the association should use blocking writes.
// By default this is false.
func WithBlockWrite(b bool) AssociationOption {
return func(a *Association) error {
a.blockWrite = b

return nil
}
}

// WithEnableZeroChecksum sets whether the association should accept zero as a valid checksum.
// By default this is false.
func WithEnableZeroChecksum(b bool) AssociationOption {
return func(a *Association) error {
a.recvZeroChecksum = b

return nil
}
}

// WithMTU sets the MTU size for the association.
// By default this is 1228.
func WithMTU(size uint32) AssociationOption {
return func(a *Association) error {
if size == 0 {
return ErrZeroMTUOption
}
a.mtu = size

return nil
}
}

// Congestion control options //

// WithMaxReceiveBufferSize sets the maximum receive buffer size for the association.
// By default this is 1024 * 1024 = 1048576.
func WithMaxReceiveBufferSize(size uint32) AssociationOption {
return func(a *Association) error {
if size == 0 {
return ErrZeroMaxReceiveBufferOption
}
a.maxReceiveBufferSize = size

return nil
}
}

// WithMaxMessageSize sets the maximum message size for the association.
// By default this is 65536.
func WithMaxMessageSize(size uint32) AssociationOption {
return func(a *Association) error {
if size == 0 {
return ErrZeroMaxMessageSize
}
a.maxMessageSize = size

return nil
}
}

// WithRTOMax sets the max retransmission timeout in ms for the association.
func WithRTOMax(rtoMax float64) AssociationOption {
return func(a *Association) error {
if rtoMax <= 0 {
return ErrInvalidRTOMax
}
a.rtoMax = rtoMax

return nil
}
}

// WithMinCwnd sets the minimum congestion window for the association.
func WithMinCwnd(minCwnd uint32) AssociationOption {
return func(a *Association) error {
a.minCwnd = minCwnd

return nil
}
}

// WithFastRtxWnd sets the fast retransmission window for the association.
func WithFastRtxWnd(fastRtxWnd uint32) AssociationOption {
return func(a *Association) error {
a.fastRtxWnd = fastRtxWnd

return nil
}
}

// WithCwndCAStep sets the congestion window congestion avoidance step for the association.
func WithCwndCAStep(cwndCAStep uint32) AssociationOption {
return func(a *Association) error {
a.cwndCAStep = cwndCAStep

return nil
}
}
Loading
Loading