diff --git a/association.go b/association.go index 838ae62b..25d825e6 100644 --- a/association.go +++ b/association.go @@ -228,6 +228,7 @@ type Association struct { // Congestion control parameters maxReceiveBufferSize uint32 maxMessageSize uint32 + rtoMax float64 cwnd uint32 // my congestion window size rwnd uint32 // calculated peer's receiver windows size ssthresh uint32 // slow start threshold @@ -248,9 +249,9 @@ type Association struct { ackTimer *ackTimer // RACK / TLP state + rack rackSettings // rack configurable options rackReoWnd time.Duration // dynamic reordering window rackMinRTT time.Duration // min observed RTT - rackMinRTTWnd *windowedMin // the window used to determine minRTT, defaults to 30s rackDeliveredTime time.Time // send time of most recently delivered original chunk rackHighestDeliveredOrigTSN uint32 rackReorderingSeen bool // ever observed reordering for this association @@ -266,9 +267,6 @@ type Association struct { rackDeadline time.Time ptoDeadline time.Time - rackWCDelAck time.Duration // 200ms default - rackReoWndFloor time.Duration - // Chunks stored for retransmission storedInit *chunkInit storedCookieEcho *chunkCookieEcho @@ -318,16 +316,17 @@ type Association struct { // Config collects the arguments to createAssociation construction into // a single structure. type Config struct { - Name string - NetConn net.Conn - MaxReceiveBufferSize uint32 - MaxMessageSize uint32 - EnableZeroChecksum bool - LoggerFactory logging.LoggerFactory - BlockWrite bool - MTU uint32 + LoggerFactory logging.LoggerFactory + Name string + NetConn net.Conn + + BlockWrite bool + EnableZeroChecksum bool + MTU uint32 // congestion control configuration + MaxReceiveBufferSize uint32 + MaxMessageSize uint32 // RTOMax is the maximum retransmission timeout in milliseconds RTOMax float64 // Minimum congestion window @@ -337,14 +336,8 @@ type Config struct { // Step of congestion window increase at Congestion Avoidance CwndCAStep uint32 - // The RACK configs are currently private as SCTP will be reworked to use the - // modern options pattern in a future release. - // Optional: size of window used to determine minimum RTT for RACK (defaults to 30s) - rackMinRTTWnd time.Duration - // Optional: cap the minimum reordering window: 0 = use quarter-RTT - rackReoWndFloor time.Duration - // Optional: receiver worst-case delayed-ACK for PTO when only one packet is in flight - rackWCDelAck time.Duration + // RACK config options + rack rackSettings } // Server accepts a SCTP stream over a conn. @@ -390,6 +383,21 @@ func createClientWithContext(ctx context.Context, config Config) (*Association, } } +// newAssociationWithOptions finalizes a pre-configured association with optional overrides. +// +//nolint:gocognit,cyclop +func newAssociationWithOptions(assoc *Association, opts ...AssociationOption) (*Association, error) { + var err error + + for _, opt := range opts { + if err = opt(assoc); err != nil { + return nil, err + } + } + + return assoc, nil +} + func createAssociation(config Config) *Association { maxReceiveBufferSize := config.MaxReceiveBufferSize if maxReceiveBufferSize == 0 { @@ -406,6 +414,8 @@ func createAssociation(config Config) *Association { mtu = initialMTU } + rtoMax := config.RTOMax + tsn := globalMathRandomGenerator.Uint32() assoc := &Association{ netConn: config.NetConn, @@ -459,18 +469,20 @@ func createAssociation(config Config) *Association { assoc.tlrBurstLaterRTTUnits = tlrBurstDefaultLaterRTT // RACK defaults - assoc.rackWCDelAck = config.rackWCDelAck - if assoc.rackWCDelAck == 0 { - assoc.rackWCDelAck = 200 * time.Millisecond // WCDelAckT, RACK for SCTP section 2C + assoc.rack.rackWCDelAck = config.rack.rackWCDelAck + if assoc.rack.rackWCDelAck == 0 { + assoc.rack.rackWCDelAck = 200 * time.Millisecond // WCDelAckT, RACK for SCTP section 2C } - // defaults to 30s window to determine minRTT - assoc.rackMinRTTWnd = newWindowedMin(config.rackMinRTTWnd) + assoc.rack.rackMinRTTWnd = config.rack.rackMinRTTWnd + if assoc.rack.rackMinRTTWnd == nil { + assoc.rack.rackMinRTTWnd = newWindowedMin(30 * time.Second) + } assoc.timerUpdateCh = make(chan struct{}, 1) go assoc.timerLoop() - assoc.rackReoWndFloor = config.rackReoWndFloor // optional floor; usually 0 + assoc.rack.rackReoWndFloor = config.rack.rackReoWndFloor // optional floor; usually 0 assoc.rackKeepInflatedRecoveries = 0 if assoc.name == "" { @@ -486,11 +498,11 @@ func createAssociation(config Config) *Association { assoc.name, assoc.CWND(), assoc.ssthresh, assoc.inflightQueue.getNumBytes()) assoc.srtt.Store(float64(0)) - assoc.t1Init = newRTXTimer(timerT1Init, assoc, maxInitRetrans, config.RTOMax) - assoc.t1Cookie = newRTXTimer(timerT1Cookie, assoc, maxInitRetrans, config.RTOMax) - assoc.t2Shutdown = newRTXTimer(timerT2Shutdown, assoc, noMaxRetrans, config.RTOMax) - assoc.t3RTX = newRTXTimer(timerT3RTX, assoc, noMaxRetrans, config.RTOMax) - assoc.tReconfig = newRTXTimer(timerReconfig, assoc, noMaxRetrans, config.RTOMax) + assoc.t1Init = newRTXTimer(timerT1Init, assoc, maxInitRetrans, rtoMax) + assoc.t1Cookie = newRTXTimer(timerT1Cookie, assoc, maxInitRetrans, rtoMax) + assoc.t2Shutdown = newRTXTimer(timerT2Shutdown, assoc, noMaxRetrans, rtoMax) + assoc.t3RTX = newRTXTimer(timerT3RTX, assoc, noMaxRetrans, rtoMax) + assoc.tReconfig = newRTXTimer(timerReconfig, assoc, noMaxRetrans, rtoMax) assoc.ackTimer = newAckTimer(assoc) return assoc @@ -1617,7 +1629,7 @@ func (a *Association) handleHeartbeatAck(c *chunkHeartbeatAck) { srtt := a.rtoMgr.setNewRTT(rttMs) a.srtt.Store(srtt) - a.rackMinRTTWnd.Push(now, now.Sub(sent)) + a.rack.rackMinRTTWnd.Push(now, now.Sub(sent)) a.log.Tracef("[%s] HB RTT: measured=%.3fms srtt=%.3fms rto=%.3fms", a.name, rttMs, srtt, a.rtoMgr.getRTO()) @@ -1955,7 +1967,7 @@ func (a *Association) processSelectiveAck(selectiveAckChunk *chunkSelectiveAck) // use a window to determine minRtt instead of a global min // as the RTT can fluctuate, which can cause problems if going from a // high RTT to a low RTT. - a.rackMinRTTWnd.Push(now, now.Sub(chunkPayload.since)) + a.rack.rackMinRTTWnd.Push(now, now.Sub(chunkPayload.since)) a.log.Tracef("[%s] SACK: measured-rtt=%f srtt=%f new-rto=%f", a.name, rtt, srtt, a.rtoMgr.getRTO()) @@ -2012,7 +2024,7 @@ func (a *Association) processSelectiveAck(selectiveAckChunk *chunkSelectiveAck) srtt := a.rtoMgr.setNewRTT(rtt) a.srtt.Store(srtt) - a.rackMinRTTWnd.Push(now, now.Sub(chunkPayload.since)) + a.rack.rackMinRTTWnd.Push(now, now.Sub(chunkPayload.since)) a.log.Tracef("[%s] SACK: measured-rtt=%f srtt=%f new-rto=%f", a.name, rtt, srtt, a.rtoMgr.getRTO()) @@ -3491,13 +3503,13 @@ func (a *Association) onRackAfterSACK( // nolint:gocognit,cyclop,gocyclo } // 2) Maintain ReoWND (RACK for SCTP section 2B) - if minRTT := a.rackMinRTTWnd.Min(currTime); minRTT > 0 { + if minRTT := a.rack.rackMinRTTWnd.Min(currTime); minRTT > 0 { a.rackMinRTT = minRTT } var base time.Duration if a.rackMinRTT > 0 { - base = max(a.rackMinRTT/4, a.rackReoWndFloor) + base = max(a.rackMinRTT/4, a.rack.rackReoWndFloor) } // Suppress during recovery if no reordering ever seen; else (re)initialize from base if zero. @@ -3510,7 +3522,7 @@ func (a *Association) onRackAfterSACK( // nolint:gocognit,cyclop,gocyclo // DSACK-style inflation using SCTP duplicate TSNs (RACK for SCTP section 3 noting SCTP // natively reports duplicates + RACK for SCTP section 2B policy) if len(sack.duplicateTSN) > 0 && a.rackMinRTT > 0 { - a.rackReoWnd += max(a.rackMinRTT/4, a.rackReoWndFloor) + a.rackReoWnd += max(a.rackMinRTT/4, a.rack.rackReoWndFloor) // keep inflated for 16 loss recoveries before reset a.rackKeepInflatedRecoveries = 16 a.log.Tracef("[%s] RACK: DSACK/dupTSN seen, inflate reoWnd to %v", a.name, a.rackReoWnd) @@ -3608,7 +3620,7 @@ func (a *Association) onRackAfterSACK( // nolint:gocognit,cyclop,gocyclo extra := 2 * time.Millisecond if a.inflightQueue.size() == 1 { - extra = a.rackWCDelAck // 200ms for single outstanding, else 2ms + extra = a.rack.rackWCDelAck // 200ms for single outstanding, else 2ms } pto = 2*srtt + extra @@ -3634,7 +3646,7 @@ func (a *Association) schedulePTOAfterSendLocked() { extra := 2 * time.Millisecond if a.inflightQueue.size() == 1 { - extra = a.rackWCDelAck + extra = a.rack.rackWCDelAck } pto = 2*srtt + extra diff --git a/association_options.go b/association_options.go new file mode 100644 index 00000000..b1d0a3fa --- /dev/null +++ b/association_options.go @@ -0,0 +1,140 @@ +// SPDX-FileCopyrightText: 2023 The Pion community +// SPDX-License-Identifier: MIT + +package sctp + +import ( + "net" + + "github.com/pion/logging" +) + +// AssociationOption represents a function that can be used to configure an Association. +type AssociationOption func(*Association) error + +// WithLoggerFactory sets the logger factory for the association. +func WithLoggerFactory(loggerFactory logging.LoggerFactory) AssociationOption { + return func(a *Association) error { + a.log = loggerFactory.NewLogger("sctp") + + return nil + } +} + +// WithName sets the name of the association. +func WithName(name string) AssociationOption { + return func(a *Association) error { + a.name = name + + return nil + } +} + +// WithNetConn sets the net.Conn used by the association. +func WithNetConn(net net.Conn) AssociationOption { + return func(a *Association) error { + a.netConn = net + + return nil + } +} + +// WithBlockWrite sets whether the association should use blocking writes. +// By default this is false. +func WithBlockWrite(b bool) AssociationOption { + return func(a *Association) error { + a.blockWrite = b + + return nil + } +} + +// WithEnableZeroChecksum sets whether the association should accept zero as a valid checksum. +// By default this is false. +func WithEnableZeroChecksum(b bool) AssociationOption { + return func(a *Association) error { + a.recvZeroChecksum = b + + return nil + } +} + +// WithMTU sets the MTU size for the association. +// By default this is 1228. +func WithMTU(size uint32) AssociationOption { + return func(a *Association) error { + if size == 0 { + return ErrZeroMTUOption + } + a.mtu = size + + return nil + } +} + +// Congestion control options // + +// WithMaxReceiveBufferSize sets the maximum receive buffer size for the association. +// By default this is 1024 * 1024 = 1048576. +func WithMaxReceiveBufferSize(size uint32) AssociationOption { + return func(a *Association) error { + if size == 0 { + return ErrZeroMaxReceiveBufferOption + } + a.maxReceiveBufferSize = size + + return nil + } +} + +// WithMaxMessageSize sets the maximum message size for the association. +// By default this is 65536. +func WithMaxMessageSize(size uint32) AssociationOption { + return func(a *Association) error { + if size == 0 { + return ErrZeroMaxMessageSize + } + a.maxMessageSize = size + + return nil + } +} + +// WithRTOMax sets the max retransmission timeout in ms for the association. +func WithRTOMax(rtoMax float64) AssociationOption { + return func(a *Association) error { + if rtoMax <= 0 { + return ErrInvalidRTOMax + } + a.rtoMax = rtoMax + + return nil + } +} + +// WithMinCwnd sets the minimum congestion window for the association. +func WithMinCwnd(minCwnd uint32) AssociationOption { + return func(a *Association) error { + a.minCwnd = minCwnd + + return nil + } +} + +// WithFastRtxWnd sets the fast retransmission window for the association. +func WithFastRtxWnd(fastRtxWnd uint32) AssociationOption { + return func(a *Association) error { + a.fastRtxWnd = fastRtxWnd + + return nil + } +} + +// WithCwndCAStep sets the congestion window congestion avoidance step for the association. +func WithCwndCAStep(cwndCAStep uint32) AssociationOption { + return func(a *Association) error { + a.cwndCAStep = cwndCAStep + + return nil + } +} diff --git a/association_rack_options.go b/association_rack_options.go new file mode 100644 index 00000000..ff7dc710 --- /dev/null +++ b/association_rack_options.go @@ -0,0 +1,81 @@ +// SPDX-FileCopyrightText: 2023 The Pion community +// SPDX-License-Identifier: MIT + +package sctp + +import ( + "time" +) + +// rackSettings holds the optional RACK related settings for an association. +type rackSettings struct { + // Optional: size of window used to determine minimum RTT for RACK (defaults to 30s) + rackMinRTTWnd *windowedMin + + // Optional: cap the minimum reordering window: 0 = use quarter-RTT + rackReoWndFloor time.Duration + + // Optional: receiver worst-case delayed-ACK for PTO when only one packet is in flight + rackWCDelAck time.Duration +} + +// AssociationRACKOption represents a function that can be used to configure an Association's RACK options. +type AssociationRACKOption func(*rackSettings) error + +// RACK config options // + +// WithRackMinRTTWnd sets the length of the local minimum window used to determine the minRTT. +// By default this is 30 seconds. +func WithRackMinRTTWnd(rackMinRTTWnd time.Duration) AssociationRACKOption { + return func(a *rackSettings) error { + if rackMinRTTWnd <= 0 { + return ErrInvalidRackMinRTTWnd + } + a.rackMinRTTWnd = newWindowedMin(rackMinRTTWnd) + + return nil + } +} + +// WithRackReoWndFloor sets the RACK reordering window floor for the association. +// By default this is 0. +func WithRackReoWndFloor(rackReoWndFloor time.Duration) AssociationRACKOption { + return func(a *rackSettings) error { + if rackReoWndFloor < 0 { + return ErrInvalidRackReoWndFloor + } + a.rackReoWndFloor = rackReoWndFloor + + return nil + } +} + +// WithRackWCDelAck sets the receiver worst-case delayed-ACK for PTO when only 1 packet is in flight. +// By default this is 200 ms. +func WithRackWCDelAck(rackWCDelAck time.Duration) AssociationRACKOption { + return func(a *rackSettings) error { + if rackWCDelAck <= 0 { + return ErrInvalidRackWcDelAck + } + a.rackWCDelAck = rackWCDelAck + + return nil + } +} + +// SetRACKOptions configures optional RACK settings using the above options. +// This also creates the new windowedMin slice used for tracking the minRTT. +func (a *Association) SetRACKOptions(opts ...AssociationRACKOption) error { + var err error + cfg := a.rack + + for _, opt := range opts { + if opt != nil { + if err = opt(&cfg); err != nil { + return err + } + } + } + + return nil +} diff --git a/association_test.go b/association_test.go index d48f5b8f..210f7844 100644 --- a/association_test.go +++ b/association_test.go @@ -4113,7 +4113,7 @@ func newRackTestAssoc(t *testing.T) *Association { // RACK defaults for tests assoc.rackReorderingSeen = false - assoc.rackReoWndFloor = 0 + assoc.rack.rackReoWndFloor = 0 // Have a non-zero SRTT so SRTT-bounding code runs deterministically. assoc.srtt.Store(float64(100.0)) // 100 ms @@ -4139,8 +4139,8 @@ func TestRACK_MarkLossOnACK(t *testing.T) { assoc.lock.Lock() - if assoc.rackMinRTTWnd == nil { - assoc.rackMinRTTWnd = newWindowedMin(30 * time.Second) + if assoc.rack.rackMinRTTWnd == nil { + assoc.rack.rackMinRTTWnd = newWindowedMin(30 * time.Second) } // MinRTT = 40ms → base reoWnd = 10ms @@ -4259,7 +4259,7 @@ func TestRACK_SuppressReoWndDuringRecovery_NoReorderingSeen(t *testing.T) { assert.Equal(t, time.Duration(0), assoc.rackReoWnd, "reoWnd should stay 0 until a minRTT sample exists") now := time.Now() - assoc.rackMinRTTWnd.Push(now, 120*time.Millisecond) + assoc.rack.rackMinRTTWnd.Push(now, 120*time.Millisecond) assoc.onRackAfterSACK(false, time.Time{}, 0, &chunkSelectiveAck{}) assert.Equal( diff --git a/errors.go b/errors.go new file mode 100644 index 00000000..f48801ce --- /dev/null +++ b/errors.go @@ -0,0 +1,33 @@ +// SPDX-FileCopyrightText: 2023 The Pion community +// SPDX-License-Identifier: MIT + +package sctp + +import ( + "errors" +) + +var ( + // ErrZeroMTUOption indicates that the MTU option was set to zero. + ErrZeroMTUOption = errors.New("MTU option cannot be set to zero") + + // ErrZeroMaxReceiveBufferOption indicates that the MTU option was set to zero. + ErrZeroMaxReceiveBufferOption = errors.New("MaxReceiveBuffer option cannot be set to zero") + + // ErrZeroMaxMessageSize indicates that the MTU option was set to zero. + ErrZeroMaxMessageSize = errors.New("MaxMessageSize option cannot be set to zero") + + // ErrInvalidRTOMax indicates that the RTO max was set to 0 or a negative value. + ErrInvalidRTOMax = errors.New("RTO max was set to <= 0") + + // ErrInvalidRackMinRTTWnd indicates the length of the local minimum window used to determine the + // minRTT was set to <= 0. + ErrInvalidRackMinRTTWnd = errors.New("RackMinRTT was set to <= 0") + + // ErrInvalidRackReoWndFloor indicates the length of the RACK reordering window floor was set to < 0. + ErrInvalidRackReoWndFloor = errors.New("RackReoWndFloor was set to < 0") + + // ErrInvalidWcDelAck indicates the receiver worst-case delayed-ACK for PTO when only 1 packet in flight + // was set to < 0. + ErrInvalidRackWcDelAck = errors.New("RackWcDelAck was set to <= 0") +)