Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Changelog

- [SA-1463] Return error when trying to create a new socket
- [SA-2074] Add a ReadPacket function
- [SA-2074] Add a ReadPacket function
- Add various fixes for SetDeadline functionality
57 changes: 41 additions & 16 deletions poll.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package srtgo
*/
import "C"
import (
"fmt"
"sync"
"sync/atomic"
"time"
Expand All @@ -19,20 +20,24 @@ const (

type PollMode int

// Issue 4
// The logic in setDeadline needs both ModeWrite and ModeRead to be non-zero.
// E.G., Previously, setDeadline(modeWrite) is equivalent to setDeadline(modeRead + modeWrite)
const (
ModeRead = PollMode(iota)
ModeUnknown = PollMode(iota)
ModeWrite
ModeRead
)

/*
pollDesc contains the polling state for the associated SrtSocket
closing: socket is closing, reject all poll operations
pollErr: an error occured on the socket, indicates it's not useable anymore.
unblockRd: is used to unblock the poller when the socket becomes ready for io
rdState: polling state for read operations
rdDeadline: deadline in NS before poll operation times out, -1 means timedout (needs to be cleared), 0 is without timeout
rdSeq: sequence number protects against spurious signalling of timeouts when timer is reset.
rdTimer: timer used to enforce deadline.
pollDesc contains the polling state for the associated SrtSocket
closing: socket is closing, reject all poll operations
pollErr: an error occured on the socket, indicates it's not useable anymore.
unblockRd: is used to unblock the poller when the socket becomes ready for io
rdState: polling state for read operations
rdDeadline: deadline in NS before poll operation times out, -1 means timedout (needs to be cleared), 0 is without timeout
rdSeq: sequence number protects against spurious signalling of timeouts when timer is reset.
rdTimer: timer used to enforce deadline.
*/
type pollDesc struct {
lock sync.Mutex
Expand All @@ -58,12 +63,16 @@ type pollDesc struct {

var pdPool = sync.Pool{
New: func() interface{} {
return &pollDesc{
desc := &pollDesc{
unblockRd: make(chan interface{}, 1),
unblockWr: make(chan interface{}, 1),
rdTimer: time.NewTimer(0),
wdTimer: time.NewTimer(0),
}
// Creating timer with 0 duration makes it fire right away. Read the channel to prevent a false fire later.
<-desc.rdTimer.C
<-desc.wdTimer.C
return desc
},
}

Expand Down Expand Up @@ -94,6 +103,7 @@ func (pd *pollDesc) release() {
}

func (pd *pollDesc) wait(mode PollMode) error {
fmt.Println("[nathan debug] wait start")
defer pd.reset(mode)
if err := pd.checkPollErr(mode); err != nil {
return err
Expand All @@ -104,10 +114,12 @@ func (pd *pollDesc) wait(mode PollMode) error {
timerSeq := int64(0)
pd.lock.Lock()
if mode == ModeRead {
fmt.Println("[nathan debug] setting up stuff for read mode")
timerSeq = pd.rtSeq
pd.rdLock.Lock()
defer pd.rdLock.Unlock()
} else if mode == ModeWrite {
fmt.Println("[nathan debug] setting up stuff for write mode")
timerSeq = pd.wtSeq
state = &pd.wrState
unblockChan = pd.unblockWr
Expand All @@ -133,8 +145,10 @@ wait:
for {
select {
case <-unblockChan:
fmt.Println("[nathan debug] unblock chan written to")
break wait
case <-expiryChan:
fmt.Println("[nathan debug] expiry chan written to")
pd.lock.Lock()
if mode == ModeRead {
if timerSeq == pd.rdSeq {
Expand All @@ -156,6 +170,7 @@ wait:
}
}
err := pd.checkPollErr(mode)
fmt.Println("[nathan debug] wait end")
return err
}

Expand Down Expand Up @@ -200,28 +215,38 @@ func (pd *pollDesc) setDeadline(t time.Time, mode PollMode) {
if mode == ModeRead || mode == ModeRead+ModeWrite {
pd.rdSeq++
pd.rtSeq = pd.rdSeq
if pd.rdDeadline > 0 {
pd.rdTimer.Stop()
// Issue 2
// Previously, there was a problem here according to https://github.com/Haivision/srtgo/pull/63.
// "If there is a delay between setting up deadlines, and the timer fired after the poll.Wait() had returned, that timer signal is not cleared from the channel and will cause wait to return immediately."
if pd.rdDeadline > 0 && !pd.rdTimer.Stop() {
<-pd.rdTimer.C
}
pd.rdDeadline = d
if d > 0 {
pd.rdTimer.Reset(time.Duration(d))
timeResetReturnValue := pd.rdTimer.Reset(time.Duration(d))
fmt.Printf("[nathan debug] finished setting up read deadline timer with duration: %.2f seconds. Timer was previously active: %t \n", time.Duration(d).Seconds(), timeResetReturnValue)
}
if d < 0 {
fmt.Println("[nathan debug] read deadline is in past, so setting read to unblock")
pd.unblock(ModeRead, false, false)
}
}
if mode == ModeWrite || mode == ModeRead+ModeWrite {
pd.wdSeq++
pd.wtSeq = pd.wdSeq
if pd.wdDeadline > 0 {
pd.wdTimer.Stop()
// Issue 2
// Previously, there was a problem here according to https://github.com/Haivision/srtgo/pull/63.
// "If there is a delay between setting up deadlines, and the timer fired after the poll.Wait() had returned, that timer signal is not cleared from the channel and will cause wait to return immediately."
if pd.wdDeadline > 0 && !pd.wdTimer.Stop() {
<-pd.wdTimer.C
}
pd.wdDeadline = d
if d > 0 {
pd.wdTimer.Reset(time.Duration(d))
timeResetReturnValue := pd.wdTimer.Reset(time.Duration(d))
fmt.Printf("[nathan debug] finished setting up write deadline timer with duration: %.2f seconds. Timer was previously active: %t \n", time.Duration(d).Seconds(), timeResetReturnValue)
}
if d < 0 {
fmt.Println("[nathan debug] write deadline is in past, so setting write to unblock")
pd.unblock(ModeWrite, false, false)
}
}
Expand Down
21 changes: 19 additions & 2 deletions read.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ int srt_recvmsg2_wrapped(SRTSOCKET u, char* buf, int len, SRT_MSGCTRL *mctrl, in
import "C"
import (
"errors"
"fmt"
"syscall"
"unsafe"
)
Expand Down Expand Up @@ -44,11 +45,19 @@ func (s SrtSocket) Read(b []byte) (n int, err error) {
}
n, err = srtRecvMsg2Impl(s.socket, b, nil)

// Issue 3
// Previously, this for loop will never break if the deadline has been hit (return value of s.pd.wait(ModeRead))
for {
if !errors.Is(err, error(EAsyncRCV)) || s.blocking {
return
}
s.pd.wait(ModeRead)

// E.G., Error because we reached timed out. Like we do for connect.
if err = s.pd.wait(ModeRead); err != nil {
fmt.Printf("[nathan debug] Read waiting had error: %v\n", err)
return
}

n, err = srtRecvMsg2Impl(s.socket, b, nil)
}
}
Expand All @@ -73,14 +82,22 @@ func (s SrtSocket) ReadPacket(packet *SrtPacket) (n int, err error) {

n, err = srtRecvMsg2Impl(s.socket, packet.Buffer, (*C.SRT_MSGCTRL)(unsafe.Pointer(&msgctrl)))

// Issue 3
// Previously, this for loop will never break if the deadline has been hit (return value of s.pd.wait(ModeRead))
for {
if !errors.Is(err, error(EAsyncRCV)) || s.blocking {
// this must include when the socket is closed, since I've seen this exit without the below fix
packet.Pktseq = int32(msgctrl.pktseq)
packet.Msgno = int32(msgctrl.msgno)
packet.Srctime = int64(msgctrl.srctime)
}

// E.G., Error because we reached timed out. Like we do for connect.
if err = s.pd.wait(ModeRead); err != nil {
fmt.Printf("[nathan debug] ReadPacket waiting had error: %v\n", err)
return
}
s.pd.wait(ModeRead)

n, err = srtRecvMsg2Impl(s.socket, packet.Buffer, (*C.SRT_MSGCTRL)(unsafe.Pointer(&msgctrl)))
}
}
7 changes: 7 additions & 0 deletions srtgo.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,21 +201,28 @@ func (s *SrtSocket) Listen(backlog int) error {

// Connect to a remote endpoint
func (s *SrtSocket) Connect() error {
fmt.Println("[nathan debug] socket.Connect start")
runtime.LockOSThread()
defer runtime.UnlockOSThread()
sa, salen, err := CreateAddrInet(s.host, s.port)
if err != nil {
return err
}

fmt.Println("[nathan debug] actual connect start")
res := C.srt_connect(s.socket, sa, C.int(salen))
fmt.Println("[nathan debug] actual connect end")
if res == SRT_ERROR {
fmt.Println("[nathan debug] actual connect had SRT error!")
C.srt_close(s.socket)
return srtGetAndClearError()
}

fmt.Printf("[nathan debug] s.blocking: %t\n", s.blocking)
if !s.blocking {
fmt.Println("[nathan debug] waiting...")
if err := s.pd.wait(ModeWrite); err != nil {
fmt.Printf("[nathan debug] waiting had error: %v\n", err)
return err
}
}
Expand Down
9 changes: 8 additions & 1 deletion write.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,18 @@ func (s SrtSocket) Write(b []byte) (n int, err error) {
}
n, err = srtSendMsg2Impl(s.socket, b, nil)

// Issue 3
// Previously, this for loop would never break if the deadline has been hit (as per return value of s.pd.wait(ModeWrite))
for {
if !errors.Is(err, error(EAsyncSND)) || s.blocking {
return
}
s.pd.wait(ModeWrite)

// E.G., Error because we reached timed out. Like we do for connect.
if err = s.pd.wait(ModeWrite); err != nil {
return
}

n, err = srtSendMsg2Impl(s.socket, b, nil)
}
}