diff --git a/handler.go b/handler.go index 77efc20..a744044 100644 --- a/handler.go +++ b/handler.go @@ -3,6 +3,7 @@ package moqtransport // Common Message types. Handlers can react to any of these messages. const ( MessageSubscribe = "SUBSCRIBE" + MessageUnsubscribe = "UNSUBSCRIBE " MessageFetch = "FETCH" MessageAnnounce = "ANNOUNCE" MessageAnnounceCancel = "ANNOUNCE_CANCEL" diff --git a/integrationtests/subscribe_test.go b/integrationtests/subscribe_test.go index c144bb2..6dd202f 100644 --- a/integrationtests/subscribe_test.go +++ b/integrationtests/subscribe_test.go @@ -2,6 +2,7 @@ package integrationtests import ( "context" + "sync/atomic" "testing" "time" @@ -113,14 +114,21 @@ func TestSubscribe(t *testing.T) { defer cancel() publisherCh := make(chan moqtransport.Publisher, 1) + var clientClosing atomic.Bool + clientClosing.Store(false) handler := moqtransport.HandlerFunc(func(w moqtransport.ResponseWriter, m *moqtransport.Message) { - assert.Equal(t, moqtransport.MessageSubscribe, m.Method) - assert.NotNil(t, w) - assert.NoError(t, w.Accept()) - publisher, ok := w.(moqtransport.Publisher) - assert.True(t, ok) - publisherCh <- publisher + if !clientClosing.Load() { + assert.Equal(t, moqtransport.MessageSubscribe, m.Method) + assert.NotNil(t, w) + assert.NoError(t, w.Accept()) + publisher, ok := w.(moqtransport.Publisher) + assert.True(t, ok) + publisherCh <- publisher + } else { + assert.Equal(t, moqtransport.MessageUnsubscribe, m.Method) + assert.Nil(t, w) + } }) _, _, _, ct, cancel := setup(t, sConn, cConn, handler) defer cancel() @@ -136,6 +144,7 @@ func TestSubscribe(t *testing.T) { assert.FailNow(t, "timeout while waiting for publisher") } + clientClosing.Store(true) assert.NoError(t, rt.Close()) time.Sleep(10 * time.Millisecond) diff --git a/session.go b/session.go index 8d0a43a..9449932 100644 --- a/session.go +++ b/session.go @@ -867,15 +867,17 @@ func (s *Session) onSubscribeUpdate(_ *wire.SubscribeUpdateMessage) error { return nil } -// TODO: Maybe don't immediately close the track and give app a chance to react -// first? func (s *Session) onUnsubscribe(msg *wire.UnsubscribeMessage) error { lt, ok := s.localTracks.findByID(msg.SubscribeID) if !ok { return errUnknownSubscribeID } lt.unsubscribe() - return nil + m := &Message{ + Method: MessageUnsubscribe, + SubscribeID: msg.SubscribeID, + } + return s.ctrlMsgReceiveQueue.enqueue(context.Background(), m) } func (s *Session) onSubscribeDone(msg *wire.SubscribeDoneMessage) error {