diff --git a/CHANGELOG.md b/CHANGELOG.md index 6014935..d0f6051 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ # Changelog - [SA-1463] Return error when trying to create a new socket -- [SA-2074] Add a ReadPacket function \ No newline at end of file +- [SA-2074] Add a ReadPacket function +- Add various fixes for SetDeadline functionality \ No newline at end of file diff --git a/poll.go b/poll.go index 336f686..48b0ebe 100644 --- a/poll.go +++ b/poll.go @@ -6,6 +6,7 @@ package srtgo */ import "C" import ( + "fmt" "sync" "sync/atomic" "time" @@ -19,20 +20,24 @@ const ( type PollMode int +// Issue 4 +// The logic in setDeadline needs both ModeWrite and ModeRead to be non-zero. +// E.G., Previously, setDeadline(modeWrite) is equivalent to setDeadline(modeRead + modeWrite) const ( - ModeRead = PollMode(iota) + ModeUnknown = PollMode(iota) ModeWrite + ModeRead ) /* - pollDesc contains the polling state for the associated SrtSocket - closing: socket is closing, reject all poll operations - pollErr: an error occured on the socket, indicates it's not useable anymore. - unblockRd: is used to unblock the poller when the socket becomes ready for io - rdState: polling state for read operations - rdDeadline: deadline in NS before poll operation times out, -1 means timedout (needs to be cleared), 0 is without timeout - rdSeq: sequence number protects against spurious signalling of timeouts when timer is reset. - rdTimer: timer used to enforce deadline. +pollDesc contains the polling state for the associated SrtSocket +closing: socket is closing, reject all poll operations +pollErr: an error occured on the socket, indicates it's not useable anymore. +unblockRd: is used to unblock the poller when the socket becomes ready for io +rdState: polling state for read operations +rdDeadline: deadline in NS before poll operation times out, -1 means timedout (needs to be cleared), 0 is without timeout +rdSeq: sequence number protects against spurious signalling of timeouts when timer is reset. +rdTimer: timer used to enforce deadline. */ type pollDesc struct { lock sync.Mutex @@ -58,12 +63,16 @@ type pollDesc struct { var pdPool = sync.Pool{ New: func() interface{} { - return &pollDesc{ + desc := &pollDesc{ unblockRd: make(chan interface{}, 1), unblockWr: make(chan interface{}, 1), rdTimer: time.NewTimer(0), wdTimer: time.NewTimer(0), } + // Creating timer with 0 duration makes it fire right away. Read the channel to prevent a false fire later. + <-desc.rdTimer.C + <-desc.wdTimer.C + return desc }, } @@ -94,6 +103,7 @@ func (pd *pollDesc) release() { } func (pd *pollDesc) wait(mode PollMode) error { + fmt.Println("[nathan debug] wait start") defer pd.reset(mode) if err := pd.checkPollErr(mode); err != nil { return err @@ -104,10 +114,12 @@ func (pd *pollDesc) wait(mode PollMode) error { timerSeq := int64(0) pd.lock.Lock() if mode == ModeRead { + fmt.Println("[nathan debug] setting up stuff for read mode") timerSeq = pd.rtSeq pd.rdLock.Lock() defer pd.rdLock.Unlock() } else if mode == ModeWrite { + fmt.Println("[nathan debug] setting up stuff for write mode") timerSeq = pd.wtSeq state = &pd.wrState unblockChan = pd.unblockWr @@ -133,8 +145,10 @@ wait: for { select { case <-unblockChan: + fmt.Println("[nathan debug] unblock chan written to") break wait case <-expiryChan: + fmt.Println("[nathan debug] expiry chan written to") pd.lock.Lock() if mode == ModeRead { if timerSeq == pd.rdSeq { @@ -156,6 +170,7 @@ wait: } } err := pd.checkPollErr(mode) + fmt.Println("[nathan debug] wait end") return err } @@ -200,28 +215,38 @@ func (pd *pollDesc) setDeadline(t time.Time, mode PollMode) { if mode == ModeRead || mode == ModeRead+ModeWrite { pd.rdSeq++ pd.rtSeq = pd.rdSeq - if pd.rdDeadline > 0 { - pd.rdTimer.Stop() + // Issue 2 + // Previously, there was a problem here according to https://github.com/Haivision/srtgo/pull/63. + // "If there is a delay between setting up deadlines, and the timer fired after the poll.Wait() had returned, that timer signal is not cleared from the channel and will cause wait to return immediately." + if pd.rdDeadline > 0 && !pd.rdTimer.Stop() { + <-pd.rdTimer.C } pd.rdDeadline = d if d > 0 { - pd.rdTimer.Reset(time.Duration(d)) + timeResetReturnValue := pd.rdTimer.Reset(time.Duration(d)) + fmt.Printf("[nathan debug] finished setting up read deadline timer with duration: %.2f seconds. Timer was previously active: %t \n", time.Duration(d).Seconds(), timeResetReturnValue) } if d < 0 { + fmt.Println("[nathan debug] read deadline is in past, so setting read to unblock") pd.unblock(ModeRead, false, false) } } if mode == ModeWrite || mode == ModeRead+ModeWrite { pd.wdSeq++ pd.wtSeq = pd.wdSeq - if pd.wdDeadline > 0 { - pd.wdTimer.Stop() + // Issue 2 + // Previously, there was a problem here according to https://github.com/Haivision/srtgo/pull/63. + // "If there is a delay between setting up deadlines, and the timer fired after the poll.Wait() had returned, that timer signal is not cleared from the channel and will cause wait to return immediately." + if pd.wdDeadline > 0 && !pd.wdTimer.Stop() { + <-pd.wdTimer.C } pd.wdDeadline = d if d > 0 { - pd.wdTimer.Reset(time.Duration(d)) + timeResetReturnValue := pd.wdTimer.Reset(time.Duration(d)) + fmt.Printf("[nathan debug] finished setting up write deadline timer with duration: %.2f seconds. Timer was previously active: %t \n", time.Duration(d).Seconds(), timeResetReturnValue) } if d < 0 { + fmt.Println("[nathan debug] write deadline is in past, so setting write to unblock") pd.unblock(ModeWrite, false, false) } } diff --git a/read.go b/read.go index b1ca7a5..2da0662 100644 --- a/read.go +++ b/read.go @@ -17,6 +17,7 @@ int srt_recvmsg2_wrapped(SRTSOCKET u, char* buf, int len, SRT_MSGCTRL *mctrl, in import "C" import ( "errors" + "fmt" "syscall" "unsafe" ) @@ -44,11 +45,19 @@ func (s SrtSocket) Read(b []byte) (n int, err error) { } n, err = srtRecvMsg2Impl(s.socket, b, nil) + // Issue 3 + // Previously, this for loop will never break if the deadline has been hit (return value of s.pd.wait(ModeRead)) for { if !errors.Is(err, error(EAsyncRCV)) || s.blocking { return } - s.pd.wait(ModeRead) + + // E.G., Error because we reached timed out. Like we do for connect. + if err = s.pd.wait(ModeRead); err != nil { + fmt.Printf("[nathan debug] Read waiting had error: %v\n", err) + return + } + n, err = srtRecvMsg2Impl(s.socket, b, nil) } } @@ -73,14 +82,22 @@ func (s SrtSocket) ReadPacket(packet *SrtPacket) (n int, err error) { n, err = srtRecvMsg2Impl(s.socket, packet.Buffer, (*C.SRT_MSGCTRL)(unsafe.Pointer(&msgctrl))) + // Issue 3 + // Previously, this for loop will never break if the deadline has been hit (return value of s.pd.wait(ModeRead)) for { if !errors.Is(err, error(EAsyncRCV)) || s.blocking { + // this must include when the socket is closed, since I've seen this exit without the below fix packet.Pktseq = int32(msgctrl.pktseq) packet.Msgno = int32(msgctrl.msgno) packet.Srctime = int64(msgctrl.srctime) + } + + // E.G., Error because we reached timed out. Like we do for connect. + if err = s.pd.wait(ModeRead); err != nil { + fmt.Printf("[nathan debug] ReadPacket waiting had error: %v\n", err) return } - s.pd.wait(ModeRead) + n, err = srtRecvMsg2Impl(s.socket, packet.Buffer, (*C.SRT_MSGCTRL)(unsafe.Pointer(&msgctrl))) } } diff --git a/srtgo.go b/srtgo.go index 3167f1e..6e5d0fa 100644 --- a/srtgo.go +++ b/srtgo.go @@ -201,6 +201,7 @@ func (s *SrtSocket) Listen(backlog int) error { // Connect to a remote endpoint func (s *SrtSocket) Connect() error { + fmt.Println("[nathan debug] socket.Connect start") runtime.LockOSThread() defer runtime.UnlockOSThread() sa, salen, err := CreateAddrInet(s.host, s.port) @@ -208,14 +209,20 @@ func (s *SrtSocket) Connect() error { return err } + fmt.Println("[nathan debug] actual connect start") res := C.srt_connect(s.socket, sa, C.int(salen)) + fmt.Println("[nathan debug] actual connect end") if res == SRT_ERROR { + fmt.Println("[nathan debug] actual connect had SRT error!") C.srt_close(s.socket) return srtGetAndClearError() } + fmt.Printf("[nathan debug] s.blocking: %t\n", s.blocking) if !s.blocking { + fmt.Println("[nathan debug] waiting...") if err := s.pd.wait(ModeWrite); err != nil { + fmt.Printf("[nathan debug] waiting had error: %v\n", err) return err } } diff --git a/write.go b/write.go index 01cb8a7..554642e 100644 --- a/write.go +++ b/write.go @@ -45,11 +45,18 @@ func (s SrtSocket) Write(b []byte) (n int, err error) { } n, err = srtSendMsg2Impl(s.socket, b, nil) + // Issue 3 + // Previously, this for loop would never break if the deadline has been hit (as per return value of s.pd.wait(ModeWrite)) for { if !errors.Is(err, error(EAsyncSND)) || s.blocking { return } - s.pd.wait(ModeWrite) + + // E.G., Error because we reached timed out. Like we do for connect. + if err = s.pd.wait(ModeWrite); err != nil { + return + } + n, err = srtSendMsg2Impl(s.socket, b, nil) } }