diff --git a/handler.go b/handler.go index 7d1a40a4..6807cc84 100644 --- a/handler.go +++ b/handler.go @@ -7,8 +7,6 @@ const ( MessageAnnounce = "ANNOUNCE" MessageAnnounceCancel = "ANNOUNCE_CANCEL" MessageUnannounce = "UNANNOUNCE" - MessageTrackStatusRequest = "TRACK_STATUS_REQUEST" - MessageTrackStatus = "TRACK_STATUS" MessageGoAway = "GO_AWAY" MessageSubscribeAnnounces = "SUBSCRIBE_ANNOUNCES" MessageUnsubscribeAnnounces = "UNSUBSCRIBE_ANNOUNCES" @@ -70,17 +68,6 @@ type FetchPublisher interface { FetchStream() (*FetchStream, error) } -// StatusRequestHandler is the interface implemented by ResponseWriters of -// TrackStatusRequest messages. The first call to Accept sends the response. -// Calling Reject sets the status to "track does not exist" and then calls -// Accept. Reject ignores the errorCode and reasonPhrase. Applications are -// responsible for following the ruls of track status messages. -type StatusRequestHandler interface { - // SetStatus sets the status for the response. Call this before calling - // Accept. - SetStatus(statusCode, lastGroupID, lastObjectID uint64) -} - // Handler is the handler interface for non-specific MoQ messages. type Handler interface { Handle(ResponseWriter, *Message) diff --git a/internal/wire/announce_ok_message.go b/internal/wire/announce_ok_message.go deleted file mode 100644 index 67683a21..00000000 --- a/internal/wire/announce_ok_message.go +++ /dev/null @@ -1,30 +0,0 @@ -package wire - -import ( - "log/slog" - - "github.com/quic-go/quic-go/quicvarint" -) - -type AnnounceOkMessage struct { - RequestID uint64 -} - -func (m *AnnounceOkMessage) LogValue() slog.Value { - return slog.GroupValue( - slog.String("type", "announce_ok"), - ) -} - -func (m AnnounceOkMessage) Type() controlMessageType { - return messageTypeAnnounceOk -} - -func (m *AnnounceOkMessage) Append(buf []byte) []byte { - return quicvarint.Append(buf, m.RequestID) -} - -func (m *AnnounceOkMessage) parse(_ Version, data []byte) (err error) { - m.RequestID, _, err = quicvarint.Parse(data) - return err -} diff --git a/internal/wire/control_message_parser.go b/internal/wire/control_message_parser.go index 15342da6..c83d2a03 100644 --- a/internal/wire/control_message_parser.go +++ b/internal/wire/control_message_parser.go @@ -58,7 +58,7 @@ func (p *ControlMessageParser) Parse() (ControlMessage, error) { m = &RequestsBlockedMessage{} case messageTypeSubscribe: - m = &SubscribeMessage{} + m = &SubscribeMessage{TrackStatus: false} case messageTypeSubscribeOk: m = &SubscribeOkMessage{} case messageTypeSubscribeError: @@ -67,8 +67,15 @@ func (p *ControlMessageParser) Parse() (ControlMessage, error) { m = &UnsubscribeMessage{} case messageTypeSubscribeUpdate: m = &SubscribeUpdateMessage{} - case messageTypeSubscribeDone: - m = &SubscribeDoneMessage{} + + case messageTypePublishDone: + m = &PublishDoneMessage{} + case messageTypePublish: + m = &PublishMessage{} + case messageTypePublishOk: + m = &PublishOkMessage{} + case messageTypePublishError: + m = &PublishErrorMessage{} case messageTypeFetch: m = &FetchMessage{} @@ -80,29 +87,31 @@ func (p *ControlMessageParser) Parse() (ControlMessage, error) { m = &FetchCancelMessage{} case messageTypeTrackStatus: - m = &TrackStatusRequestMessage{} + m = &SubscribeMessage{TrackStatus: true} case messageTypeTrackStatusOk: - m = &TrackStatusMessage{} + m = &SubscribeOkMessage{} + case messageTypeTrackStatusError: + m = &SubscribeErrorMessage{} - case messageTypeAnnounce: - m = &AnnounceMessage{} - case messageTypeAnnounceOk: - m = &AnnounceOkMessage{} - case messageTypeAnnounceError: - m = &AnnounceErrorMessage{} - case messageTypeUnannounce: - m = &UnannounceMessage{} - case messageTypeAnnounceCancel: - m = &AnnounceCancelMessage{} + case messageTypePublishNamespace: + m = &PublishNamespaceMessage{} + case messageTypePublishNamespaceOk: + m = &PublishNamespaceOkMessage{} + case messageTypePublishNamespaceError: + m = &PublishNamespaceErrorMessage{} + case messageTypePublishNamespaceDone: + m = &PublishNamespaceDoneMessage{} + case messageTypePublishNamespaceCancel: + m = &PublishNamespaceCancelMessage{} case messageTypeSubscribeNamespace: - m = &SubscribeAnnouncesMessage{} + m = &SubscribeNamespaceMessage{} case messageTypeSubscribeNamespaceOk: - m = &SubscribeAnnouncesOkMessage{} + m = &SubscribeNamespaceOkMessage{} case messageTypeSubscribeNamespaceError: - m = &SubscribeAnnouncesErrorMessage{} + m = &SubscribeNamespaceErrorMessage{} case messageTypeUnsubscribeNamespace: - m = &UnsubscribeAnnouncesMessage{} + m = &UnsubscribeNamespaceMessage{} default: return nil, fmt.Errorf("%w: %v", errInvalidMessageType, mt) } diff --git a/internal/wire/control_message_type.go b/internal/wire/control_message_type.go index a6bc1931..a1cd585e 100644 --- a/internal/wire/control_message_type.go +++ b/internal/wire/control_message_type.go @@ -22,8 +22,8 @@ const ( messageTypeSubscribeError controlMessageType = 0x05 messageTypeSubscribeUpdate controlMessageType = 0x02 messageTypeUnsubscribe controlMessageType = 0x0a - messageTypeSubscribeDone controlMessageType = 0x0b + messageTypePublishDone controlMessageType = 0x0b messageTypePublish controlMessageType = 0x1d messageTypePublishOk controlMessageType = 0x1e messageTypePublishError controlMessageType = 0x1f @@ -37,11 +37,11 @@ const ( messageTypeTrackStatusOk controlMessageType = 0x0e messageTypeTrackStatusError controlMessageType = 0x0f - messageTypeAnnounce controlMessageType = 0x06 - messageTypeAnnounceOk controlMessageType = 0x07 - messageTypeAnnounceError controlMessageType = 0x08 - messageTypeUnannounce controlMessageType = 0x09 - messageTypeAnnounceCancel controlMessageType = 0x0c + messageTypePublishNamespace controlMessageType = 0x06 + messageTypePublishNamespaceOk controlMessageType = 0x07 + messageTypePublishNamespaceError controlMessageType = 0x08 + messageTypePublishNamespaceDone controlMessageType = 0x09 + messageTypePublishNamespaceCancel controlMessageType = 0x0c messageTypeSubscribeNamespace controlMessageType = 0x11 messageTypeSubscribeNamespaceOk controlMessageType = 0x12 @@ -74,9 +74,9 @@ func (mt controlMessageType) String() string { return "Unsubscribe" case messageTypeSubscribeUpdate: return "SubscribeUpdate" - case messageTypeSubscribeDone: - return "SubscribeDone" + case messageTypePublishDone: + return "PublishDone" case messageTypePublish: return "Publish" case messageTypePublishOk: @@ -100,16 +100,16 @@ func (mt controlMessageType) String() string { case messageTypeTrackStatusError: return "TrackStatusError" - case messageTypeAnnounce: - return "Announce" - case messageTypeAnnounceOk: - return "AnnounceOk" - case messageTypeAnnounceError: - return "AnnounceError" - case messageTypeUnannounce: - return "Unannounce" - case messageTypeAnnounceCancel: - return "AnnounceCancel" + case messageTypePublishNamespace: + return "PublishNamespace" + case messageTypePublishNamespaceOk: + return "PublishNamespaceOk" + case messageTypePublishNamespaceError: + return "PublishNamespaceError" + case messageTypePublishNamespaceDone: + return "PublishNamespaceDone" + case messageTypePublishNamespaceCancel: + return "PublishNamespaceCancel" case messageTypeSubscribeNamespace: return "SubscribeNamespace" diff --git a/internal/wire/subscribe_done_message.go b/internal/wire/publish_done_message.go similarity index 73% rename from internal/wire/subscribe_done_message.go rename to internal/wire/publish_done_message.go index 7ff58e55..f2f54b7c 100644 --- a/internal/wire/subscribe_done_message.go +++ b/internal/wire/publish_done_message.go @@ -6,16 +6,16 @@ import ( "github.com/quic-go/quic-go/quicvarint" ) -type SubscribeDoneMessage struct { +type PublishDoneMessage struct { RequestID uint64 StatusCode uint64 StreamCount uint64 ReasonPhrase string } -func (m *SubscribeDoneMessage) LogValue() slog.Value { +func (m *PublishDoneMessage) LogValue() slog.Value { return slog.GroupValue( - slog.String("type", "subscribe_done"), + slog.String("type", "publish_done"), slog.Uint64("request_id", m.RequestID), slog.Uint64("status_code", m.StatusCode), slog.Uint64("stream_count", m.StreamCount), @@ -23,11 +23,11 @@ func (m *SubscribeDoneMessage) LogValue() slog.Value { ) } -func (m SubscribeDoneMessage) Type() controlMessageType { - return messageTypeSubscribeDone +func (m PublishDoneMessage) Type() controlMessageType { + return messageTypePublishDone } -func (m *SubscribeDoneMessage) Append(buf []byte) []byte { +func (m *PublishDoneMessage) Append(buf []byte) []byte { buf = quicvarint.Append(buf, m.RequestID) buf = quicvarint.Append(buf, m.StatusCode) buf = quicvarint.Append(buf, m.StreamCount) @@ -35,7 +35,7 @@ func (m *SubscribeDoneMessage) Append(buf []byte) []byte { return buf } -func (m *SubscribeDoneMessage) parse(_ Version, data []byte) (err error) { +func (m *PublishDoneMessage) parse(_ Version, data []byte) (err error) { var n int m.RequestID, n, err = quicvarint.Parse(data) if err != nil { diff --git a/internal/wire/subscribe_done_message_test.go b/internal/wire/publish_done_message_test.go similarity index 85% rename from internal/wire/subscribe_done_message_test.go rename to internal/wire/publish_done_message_test.go index 03c8541f..826af950 100644 --- a/internal/wire/subscribe_done_message_test.go +++ b/internal/wire/publish_done_message_test.go @@ -10,12 +10,12 @@ import ( func TestSubscribeDoneMessageAppend(t *testing.T) { cases := []struct { - srm SubscribeDoneMessage + srm PublishDoneMessage buf []byte expect []byte }{ { - srm: SubscribeDoneMessage{ + srm: PublishDoneMessage{ RequestID: 0, StatusCode: 0, StreamCount: 0, @@ -25,7 +25,7 @@ func TestSubscribeDoneMessageAppend(t *testing.T) { expect: []byte{0x00, 0x00, 0x00, 0x00}, }, { - srm: SubscribeDoneMessage{ + srm: PublishDoneMessage{ RequestID: 0, StatusCode: 1, StreamCount: 2, @@ -40,7 +40,7 @@ func TestSubscribeDoneMessageAppend(t *testing.T) { }, }, { - srm: SubscribeDoneMessage{ + srm: PublishDoneMessage{ RequestID: 17, StatusCode: 1, StreamCount: 4, @@ -56,7 +56,7 @@ func TestSubscribeDoneMessageAppend(t *testing.T) { }, }, { - srm: SubscribeDoneMessage{ + srm: PublishDoneMessage{ RequestID: 0, StatusCode: 0, StreamCount: 0, @@ -66,7 +66,7 @@ func TestSubscribeDoneMessageAppend(t *testing.T) { expect: []byte{0x00, 0x00, 0x00, 0x00}, }, { - srm: SubscribeDoneMessage{ + srm: PublishDoneMessage{ RequestID: 0, StatusCode: 1, StreamCount: 2, @@ -81,7 +81,7 @@ func TestSubscribeDoneMessageAppend(t *testing.T) { }, }, { - srm: SubscribeDoneMessage{ + srm: PublishDoneMessage{ RequestID: 17, StatusCode: 1, StreamCount: 2, @@ -108,24 +108,24 @@ func TestSubscribeDoneMessageAppend(t *testing.T) { func TestParseSubscribeDoneMessage(t *testing.T) { cases := []struct { data []byte - expect *SubscribeDoneMessage + expect *PublishDoneMessage err error }{ { data: nil, - expect: &SubscribeDoneMessage{}, + expect: &PublishDoneMessage{}, err: io.EOF, }, { data: []byte{}, - expect: &SubscribeDoneMessage{}, + expect: &PublishDoneMessage{}, err: io.EOF, }, { data: []byte{ 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, }, - expect: &SubscribeDoneMessage{ + expect: &PublishDoneMessage{ RequestID: 0, StatusCode: 0, StreamCount: 0, @@ -143,7 +143,7 @@ func TestParseSubscribeDoneMessage(t *testing.T) { 0x02, 0x03, }, - expect: &SubscribeDoneMessage{ + expect: &PublishDoneMessage{ RequestID: 0, StatusCode: 1, StreamCount: 2, @@ -159,7 +159,7 @@ func TestParseSubscribeDoneMessage(t *testing.T) { 0x06, 'r', 'e', 'a', 's', 'o', 'n', 0x00, }, - expect: &SubscribeDoneMessage{ + expect: &PublishDoneMessage{ RequestID: 0, StatusCode: 1, StreamCount: 2, @@ -171,7 +171,7 @@ func TestParseSubscribeDoneMessage(t *testing.T) { data: []byte{ 0x00, 0x00, 0x00, 0x00, }, - expect: &SubscribeDoneMessage{ + expect: &PublishDoneMessage{ RequestID: 0, StatusCode: 0, StreamCount: 0, @@ -182,7 +182,7 @@ func TestParseSubscribeDoneMessage(t *testing.T) { } for i, tc := range cases { t.Run(fmt.Sprintf("%v", i), func(t *testing.T) { - res := &SubscribeDoneMessage{} + res := &PublishDoneMessage{} err := res.parse(CurrentVersion, tc.data) assert.Equal(t, tc.expect, res) if tc.err != nil { diff --git a/internal/wire/publish_error_message.go b/internal/wire/publish_error_message.go index ed576a57..078bdc1e 100644 --- a/internal/wire/publish_error_message.go +++ b/internal/wire/publish_error_message.go @@ -14,7 +14,7 @@ type PublishErrorMessage struct { func (m *PublishErrorMessage) LogValue() slog.Value { return slog.GroupValue( - slog.String("type", "subscribe_error"), + slog.String("type", "publish_error"), slog.Uint64("request_id", m.RequestID), slog.Uint64("error_code", m.ErrorCode), slog.String("reason", m.ReasonPhrase), diff --git a/internal/wire/announce_cancel_message.go b/internal/wire/publish_namespace_cancel_message.go similarity index 62% rename from internal/wire/announce_cancel_message.go rename to internal/wire/publish_namespace_cancel_message.go index 323a23e6..76ca829c 100644 --- a/internal/wire/announce_cancel_message.go +++ b/internal/wire/publish_namespace_cancel_message.go @@ -6,37 +6,37 @@ import ( "github.com/quic-go/quic-go/quicvarint" ) -type AnnounceCancelMessage struct { +type PublishNamespaceCancelMessage struct { TrackNamespace Tuple ErrorCode uint64 ReasonPhrase string } -func (m *AnnounceCancelMessage) LogValue() slog.Value { +func (m *PublishNamespaceCancelMessage) LogValue() slog.Value { return slog.GroupValue( - slog.String("type", "announce_cancel"), + slog.String("type", "publish_namespace_cancel"), slog.Any("track_namespace", m.TrackNamespace), slog.Uint64("error_code", m.ErrorCode), slog.String("reason", m.ReasonPhrase), ) } -func (m AnnounceCancelMessage) GetTrackNamespace() string { +func (m PublishNamespaceCancelMessage) GetTrackNamespace() string { return m.TrackNamespace.String() } -func (m AnnounceCancelMessage) Type() controlMessageType { - return messageTypeAnnounce +func (m PublishNamespaceCancelMessage) Type() controlMessageType { + return messageTypePublishNamespace } -func (m *AnnounceCancelMessage) Append(buf []byte) []byte { +func (m *PublishNamespaceCancelMessage) Append(buf []byte) []byte { buf = m.TrackNamespace.append(buf) buf = quicvarint.Append(buf, m.ErrorCode) buf = appendVarIntBytes(buf, []byte(m.ReasonPhrase)) return buf } -func (m *AnnounceCancelMessage) parse(_ Version, data []byte) (err error) { +func (m *PublishNamespaceCancelMessage) parse(_ Version, data []byte) (err error) { var n int m.TrackNamespace, n, err = parseTuple(data) if err != nil { diff --git a/internal/wire/announce_cancel_message_test.go b/internal/wire/publish_namespace_cancel_message_test.go similarity index 83% rename from internal/wire/announce_cancel_message_test.go rename to internal/wire/publish_namespace_cancel_message_test.go index a61e6467..a2961963 100644 --- a/internal/wire/announce_cancel_message_test.go +++ b/internal/wire/publish_namespace_cancel_message_test.go @@ -10,12 +10,12 @@ import ( func TestAnnounceCancelMessageAppend(t *testing.T) { cases := []struct { - aom AnnounceCancelMessage + aom PublishNamespaceCancelMessage buf []byte expect []byte }{ { - aom: AnnounceCancelMessage{ + aom: PublishNamespaceCancelMessage{ TrackNamespace: []string{""}, ErrorCode: 1, ReasonPhrase: "reason", @@ -26,7 +26,7 @@ func TestAnnounceCancelMessageAppend(t *testing.T) { }, }, { - aom: AnnounceCancelMessage{ + aom: PublishNamespaceCancelMessage{ TrackNamespace: []string{"tracknamespace"}, ErrorCode: 1, ReasonPhrase: "reason", @@ -51,19 +51,19 @@ func TestAnnounceCancelMessageAppend(t *testing.T) { func TestParseAnnounceCancelMessage(t *testing.T) { cases := []struct { data []byte - expect *AnnounceCancelMessage + expect *PublishNamespaceCancelMessage err error }{ { data: nil, - expect: &AnnounceCancelMessage{}, + expect: &PublishNamespaceCancelMessage{}, err: io.EOF, }, { data: append( []byte{0x01, 0x0E}, append([]byte("tracknamespace"), 0x00, 0x00)..., ), - expect: &AnnounceCancelMessage{ + expect: &PublishNamespaceCancelMessage{ TrackNamespace: []string{"tracknamespace"}, ErrorCode: 0, ReasonPhrase: "", @@ -72,7 +72,7 @@ func TestParseAnnounceCancelMessage(t *testing.T) { }, { data: append([]byte{0x01, 0x05}, append([]byte("track"), []byte{0x01, 0x06, 'r', 'e', 'a', 's', 'o', 'n', 'p', 'h', 'r', 'a', 's', 'e'}...)...), - expect: &AnnounceCancelMessage{ + expect: &PublishNamespaceCancelMessage{ TrackNamespace: []string{"track"}, ErrorCode: 1, ReasonPhrase: "reason", @@ -81,7 +81,7 @@ func TestParseAnnounceCancelMessage(t *testing.T) { }, { data: append([]byte{0x01, 0x0F}, "tracknamespace"...), - expect: &AnnounceCancelMessage{ + expect: &PublishNamespaceCancelMessage{ TrackNamespace: []string{}, }, err: errLengthMismatch, @@ -89,7 +89,7 @@ func TestParseAnnounceCancelMessage(t *testing.T) { } for i, tc := range cases { t.Run(fmt.Sprintf("%v", i), func(t *testing.T) { - res := &AnnounceCancelMessage{} + res := &PublishNamespaceCancelMessage{} err := res.parse(CurrentVersion, tc.data) assert.Equal(t, tc.expect, res) if tc.err != nil { diff --git a/internal/wire/publish_namespace_done_message.go b/internal/wire/publish_namespace_done_message.go new file mode 100644 index 00000000..f404cea5 --- /dev/null +++ b/internal/wire/publish_namespace_done_message.go @@ -0,0 +1,30 @@ +package wire + +import ( + "log/slog" +) + +type PublishNamespaceDoneMessage struct { + TrackNamespace Tuple +} + +func (m *PublishNamespaceDoneMessage) LogValue() slog.Value { + return slog.GroupValue( + slog.String("type", "publish_namespace_done"), + slog.Any("track_namespace", m.TrackNamespace), + ) +} + +func (m PublishNamespaceDoneMessage) Type() controlMessageType { + return messageTypePublishNamespaceDone +} + +func (m *PublishNamespaceDoneMessage) Append(buf []byte) []byte { + buf = m.TrackNamespace.append(buf) + return buf +} + +func (p *PublishNamespaceDoneMessage) parse(_ Version, data []byte) (err error) { + p.TrackNamespace, _, err = parseTuple(data) + return err +} diff --git a/internal/wire/unannounce_message_test.go b/internal/wire/publish_namespace_done_message_test.go similarity index 80% rename from internal/wire/unannounce_message_test.go rename to internal/wire/publish_namespace_done_message_test.go index 47d984d2..1526f1be 100644 --- a/internal/wire/unannounce_message_test.go +++ b/internal/wire/publish_namespace_done_message_test.go @@ -10,12 +10,12 @@ import ( func TestUnannounceMessageAppend(t *testing.T) { cases := []struct { - uam UnannounceMessage + uam PublishNamespaceDoneMessage buf []byte expect []byte }{ { - uam: UnannounceMessage{ + uam: PublishNamespaceDoneMessage{ TrackNamespace: []string{""}, }, buf: []byte{}, @@ -24,7 +24,7 @@ func TestUnannounceMessageAppend(t *testing.T) { }, }, { - uam: UnannounceMessage{ + uam: PublishNamespaceDoneMessage{ TrackNamespace: []string{"tracknamespace"}, }, buf: []byte{0x0a, 0x0b}, @@ -42,31 +42,31 @@ func TestUnannounceMessageAppend(t *testing.T) { func TestParseUnannounceMessage(t *testing.T) { cases := []struct { data []byte - expect *UnannounceMessage + expect *PublishNamespaceDoneMessage err error }{ { data: nil, - expect: &UnannounceMessage{}, + expect: &PublishNamespaceDoneMessage{}, err: io.EOF, }, { data: append([]byte{0x01, 0x0E}, "tracknamespace"...), - expect: &UnannounceMessage{ + expect: &PublishNamespaceDoneMessage{ TrackNamespace: []string{"tracknamespace"}, }, err: nil, }, { data: append([]byte{0x01, 0x05}, "tracknamespace"...), - expect: &UnannounceMessage{ + expect: &PublishNamespaceDoneMessage{ TrackNamespace: []string{"track"}, }, err: nil, }, { data: append([]byte{0x01, 0x0F}, "tracknamespace"...), - expect: &UnannounceMessage{ + expect: &PublishNamespaceDoneMessage{ TrackNamespace: []string{}, }, err: errLengthMismatch, @@ -74,7 +74,7 @@ func TestParseUnannounceMessage(t *testing.T) { } for i, tc := range cases { t.Run(fmt.Sprintf("%v", i), func(t *testing.T) { - res := &UnannounceMessage{} + res := &PublishNamespaceDoneMessage{} err := res.parse(CurrentVersion, tc.data) if tc.err != nil { assert.Equal(t, tc.err, err) diff --git a/internal/wire/announce_error_message.go b/internal/wire/publish_namespace_error_message.go similarity index 63% rename from internal/wire/announce_error_message.go rename to internal/wire/publish_namespace_error_message.go index bbf21955..71a3146f 100644 --- a/internal/wire/announce_error_message.go +++ b/internal/wire/publish_namespace_error_message.go @@ -6,32 +6,32 @@ import ( "github.com/quic-go/quic-go/quicvarint" ) -type AnnounceErrorMessage struct { +type PublishNamespaceErrorMessage struct { RequestID uint64 ErrorCode uint64 ReasonPhrase string } -func (m *AnnounceErrorMessage) LogValue() slog.Value { +func (m *PublishNamespaceErrorMessage) LogValue() slog.Value { return slog.GroupValue( - slog.String("type", "announce_error"), + slog.String("type", "publish_namespace_error"), slog.Uint64("error_code", m.ErrorCode), slog.String("reason", m.ReasonPhrase), ) } -func (m AnnounceErrorMessage) Type() controlMessageType { - return messageTypeAnnounceError +func (m PublishNamespaceErrorMessage) Type() controlMessageType { + return messageTypePublishNamespaceError } -func (m *AnnounceErrorMessage) Append(buf []byte) []byte { +func (m *PublishNamespaceErrorMessage) Append(buf []byte) []byte { buf = quicvarint.Append(buf, m.RequestID) buf = quicvarint.Append(buf, m.ErrorCode) buf = appendVarIntBytes(buf, []byte(m.ReasonPhrase)) return buf } -func (m *AnnounceErrorMessage) parse(_ Version, data []byte) (err error) { +func (m *PublishNamespaceErrorMessage) parse(_ Version, data []byte) (err error) { var n int m.RequestID, n, err = quicvarint.Parse(data) if err != nil { diff --git a/internal/wire/announce_error_message_test.go b/internal/wire/publish_namespace_error_message_test.go similarity index 82% rename from internal/wire/announce_error_message_test.go rename to internal/wire/publish_namespace_error_message_test.go index 685e7f8d..b628c97a 100644 --- a/internal/wire/announce_error_message_test.go +++ b/internal/wire/publish_namespace_error_message_test.go @@ -10,12 +10,12 @@ import ( func TestAnnounceErrorMessageAppend(t *testing.T) { cases := []struct { - aem AnnounceErrorMessage + aem PublishNamespaceErrorMessage buf []byte expect []byte }{ { - aem: AnnounceErrorMessage{ + aem: PublishNamespaceErrorMessage{ RequestID: 0, ErrorCode: 0, ReasonPhrase: "", @@ -26,7 +26,7 @@ func TestAnnounceErrorMessageAppend(t *testing.T) { }, }, { - aem: AnnounceErrorMessage{ + aem: PublishNamespaceErrorMessage{ RequestID: 1, ErrorCode: 1, ReasonPhrase: "reason", @@ -35,7 +35,7 @@ func TestAnnounceErrorMessageAppend(t *testing.T) { expect: append([]byte{0x01, 0x01, 0x06}, "reason"...), }, { - aem: AnnounceErrorMessage{ + aem: PublishNamespaceErrorMessage{ RequestID: 1, ErrorCode: 1, ReasonPhrase: "reason", @@ -55,17 +55,17 @@ func TestAnnounceErrorMessageAppend(t *testing.T) { func TestParseAnnounceErrorMessage(t *testing.T) { cases := []struct { data []byte - expect *AnnounceErrorMessage + expect *PublishNamespaceErrorMessage err error }{ { data: nil, - expect: &AnnounceErrorMessage{}, + expect: &PublishNamespaceErrorMessage{}, err: io.EOF, }, { data: []byte{0x01, 0x03, 0x03, 'e', 'r'}, - expect: &AnnounceErrorMessage{ + expect: &PublishNamespaceErrorMessage{ RequestID: 1, ErrorCode: 3, ReasonPhrase: "", @@ -74,7 +74,7 @@ func TestParseAnnounceErrorMessage(t *testing.T) { }, { data: append([]byte{0x00, 0x01, 0x0d}, "reason phrase"...), - expect: &AnnounceErrorMessage{ + expect: &PublishNamespaceErrorMessage{ RequestID: 0, ErrorCode: 1, ReasonPhrase: "reason phrase", @@ -84,7 +84,7 @@ func TestParseAnnounceErrorMessage(t *testing.T) { } for i, tc := range cases { t.Run(fmt.Sprintf("%v", i), func(t *testing.T) { - res := &AnnounceErrorMessage{} + res := &PublishNamespaceErrorMessage{} err := res.parse(CurrentVersion, tc.data) if tc.err != nil { assert.Equal(t, tc.err, err) diff --git a/internal/wire/announce_message.go b/internal/wire/publish_namespace_message.go similarity index 66% rename from internal/wire/announce_message.go rename to internal/wire/publish_namespace_message.go index 029edae8..92069c28 100644 --- a/internal/wire/announce_message.go +++ b/internal/wire/publish_namespace_message.go @@ -6,15 +6,15 @@ import ( "github.com/quic-go/quic-go/quicvarint" ) -type AnnounceMessage struct { +type PublishNamespaceMessage struct { RequestID uint64 TrackNamespace Tuple Parameters KVPList } -func (m *AnnounceMessage) LogValue() slog.Value { +func (m *PublishNamespaceMessage) LogValue() slog.Value { attrs := []slog.Attr{ - slog.String("type", "announce"), + slog.String("type", "publish_namespace"), slog.Any("track_namespace", m.TrackNamespace), slog.Uint64("number_of_parameters", uint64(len(m.Parameters))), } @@ -26,21 +26,21 @@ func (m *AnnounceMessage) LogValue() slog.Value { return slog.GroupValue(attrs...) } -func (m AnnounceMessage) GetTrackNamespace() string { +func (m PublishNamespaceMessage) GetTrackNamespace() string { return m.TrackNamespace.String() } -func (m AnnounceMessage) Type() controlMessageType { - return messageTypeAnnounce +func (m PublishNamespaceMessage) Type() controlMessageType { + return messageTypePublishNamespace } -func (m *AnnounceMessage) Append(buf []byte) []byte { +func (m *PublishNamespaceMessage) Append(buf []byte) []byte { buf = quicvarint.Append(buf, m.RequestID) buf = m.TrackNamespace.append(buf) return m.Parameters.appendNum(buf) } -func (m *AnnounceMessage) parse(_ Version, data []byte) (err error) { +func (m *PublishNamespaceMessage) parse(_ Version, data []byte) (err error) { var n int m.RequestID, n, err = quicvarint.Parse(data) if err != nil { diff --git a/internal/wire/announce_message_test.go b/internal/wire/publish_namespace_message_test.go similarity index 83% rename from internal/wire/announce_message_test.go rename to internal/wire/publish_namespace_message_test.go index adebd50e..d59e5404 100644 --- a/internal/wire/announce_message_test.go +++ b/internal/wire/publish_namespace_message_test.go @@ -10,12 +10,12 @@ import ( func TestAnnounceMessageAppend(t *testing.T) { cases := []struct { - am AnnounceMessage + am PublishNamespaceMessage buf []byte expect []byte }{ { - am: AnnounceMessage{ + am: PublishNamespaceMessage{ RequestID: 0, TrackNamespace: []string{""}, Parameters: KVPList{}, @@ -26,7 +26,7 @@ func TestAnnounceMessageAppend(t *testing.T) { }, }, { - am: AnnounceMessage{ + am: PublishNamespaceMessage{ RequestID: 1, TrackNamespace: []string{"tracknamespace"}, Parameters: KVPList{}, @@ -46,22 +46,22 @@ func TestAnnounceMessageAppend(t *testing.T) { func TestParseAnnounceMessage(t *testing.T) { cases := []struct { data []byte - expect *AnnounceMessage + expect *PublishNamespaceMessage err error }{ { data: nil, - expect: &AnnounceMessage{}, + expect: &PublishNamespaceMessage{}, err: io.EOF, }, { data: []byte{}, - expect: &AnnounceMessage{}, + expect: &PublishNamespaceMessage{}, err: io.EOF, }, { data: append(append([]byte{0x00, 0x01, 0x09}, "trackname"...), 0x00), - expect: &AnnounceMessage{ + expect: &PublishNamespaceMessage{ RequestID: 0, TrackNamespace: []string{"trackname"}, Parameters: KVPList{}, @@ -71,7 +71,7 @@ func TestParseAnnounceMessage(t *testing.T) { } for i, tc := range cases { t.Run(fmt.Sprintf("%v", i), func(t *testing.T) { - res := &AnnounceMessage{} + res := &PublishNamespaceMessage{} err := res.parse(CurrentVersion, tc.data) assert.Equal(t, tc.expect, res) if tc.err != nil { diff --git a/internal/wire/publish_namespace_ok_message.go b/internal/wire/publish_namespace_ok_message.go new file mode 100644 index 00000000..831189d0 --- /dev/null +++ b/internal/wire/publish_namespace_ok_message.go @@ -0,0 +1,30 @@ +package wire + +import ( + "log/slog" + + "github.com/quic-go/quic-go/quicvarint" +) + +type PublishNamespaceOkMessage struct { + RequestID uint64 +} + +func (m *PublishNamespaceOkMessage) LogValue() slog.Value { + return slog.GroupValue( + slog.String("type", "publish_namespace_ok"), + ) +} + +func (m PublishNamespaceOkMessage) Type() controlMessageType { + return messageTypePublishNamespaceOk +} + +func (m *PublishNamespaceOkMessage) Append(buf []byte) []byte { + return quicvarint.Append(buf, m.RequestID) +} + +func (m *PublishNamespaceOkMessage) parse(_ Version, data []byte) (err error) { + m.RequestID, _, err = quicvarint.Parse(data) + return err +} diff --git a/internal/wire/announce_ok_message_test.go b/internal/wire/publish_namespace_ok_message_test.go similarity index 77% rename from internal/wire/announce_ok_message_test.go rename to internal/wire/publish_namespace_ok_message_test.go index baadab6f..16c1b9c2 100644 --- a/internal/wire/announce_ok_message_test.go +++ b/internal/wire/publish_namespace_ok_message_test.go @@ -10,12 +10,12 @@ import ( func TestAnnounceOkMessageAppend(t *testing.T) { cases := []struct { - aom AnnounceOkMessage + aom PublishNamespaceOkMessage buf []byte expect []byte }{ { - aom: AnnounceOkMessage{ + aom: PublishNamespaceOkMessage{ RequestID: 1, }, buf: []byte{}, @@ -24,7 +24,7 @@ func TestAnnounceOkMessageAppend(t *testing.T) { }, }, { - aom: AnnounceOkMessage{ + aom: PublishNamespaceOkMessage{ RequestID: 1, }, buf: []byte{0x0a, 0x0b}, @@ -42,31 +42,31 @@ func TestAnnounceOkMessageAppend(t *testing.T) { func TestParseAnnounceOkMessage(t *testing.T) { cases := []struct { data []byte - expect *AnnounceOkMessage + expect *PublishNamespaceOkMessage err error }{ { data: nil, - expect: &AnnounceOkMessage{}, + expect: &PublishNamespaceOkMessage{}, err: io.EOF, }, { data: []byte{0x01}, - expect: &AnnounceOkMessage{ + expect: &PublishNamespaceOkMessage{ RequestID: 1, }, err: nil, }, { data: []byte{0x01}, - expect: &AnnounceOkMessage{ + expect: &PublishNamespaceOkMessage{ RequestID: 1, }, err: nil, }, { data: []byte{}, - expect: &AnnounceOkMessage{ + expect: &PublishNamespaceOkMessage{ RequestID: 0, }, err: io.EOF, @@ -74,7 +74,7 @@ func TestParseAnnounceOkMessage(t *testing.T) { } for i, tc := range cases { t.Run(fmt.Sprintf("%v", i), func(t *testing.T) { - res := &AnnounceOkMessage{} + res := &PublishNamespaceOkMessage{} err := res.parse(CurrentVersion, tc.data) assert.Equal(t, tc.expect, res) if tc.err != nil { diff --git a/internal/wire/subscribe_announces_ok_message.go b/internal/wire/subscribe_announces_ok_message.go deleted file mode 100644 index 0a43aba3..00000000 --- a/internal/wire/subscribe_announces_ok_message.go +++ /dev/null @@ -1,31 +0,0 @@ -package wire - -import ( - "log/slog" - - "github.com/quic-go/quic-go/quicvarint" -) - -// TODO: Add tests -type SubscribeAnnouncesOkMessage struct { - RequestID uint64 -} - -func (m *SubscribeAnnouncesOkMessage) LogValue() slog.Value { - return slog.GroupValue( - slog.String("type", "subscribe_announces_ok"), - ) -} - -func (m SubscribeAnnouncesOkMessage) Type() controlMessageType { - return messageTypeSubscribeNamespaceOk -} - -func (m *SubscribeAnnouncesOkMessage) Append(buf []byte) []byte { - return quicvarint.Append(buf, m.RequestID) -} - -func (m *SubscribeAnnouncesOkMessage) parse(_ Version, data []byte) (err error) { - m.RequestID, _, err = quicvarint.Parse(data) - return err -} diff --git a/internal/wire/subscribe_error_message.go b/internal/wire/subscribe_error_message.go index 4bdbc7a7..631f2ce6 100644 --- a/internal/wire/subscribe_error_message.go +++ b/internal/wire/subscribe_error_message.go @@ -7,6 +7,8 @@ import ( ) type SubscribeErrorMessage struct { + TrackStatus bool + RequestID uint64 ErrorCode uint64 ReasonPhrase string @@ -23,6 +25,9 @@ func (m *SubscribeErrorMessage) LogValue() slog.Value { } func (m SubscribeErrorMessage) Type() controlMessageType { + if m.TrackStatus { + return messageTypeTrackStatusError + } return messageTypeSubscribeError } diff --git a/internal/wire/subscribe_message.go b/internal/wire/subscribe_message.go index e3a742f1..953e5c25 100644 --- a/internal/wire/subscribe_message.go +++ b/internal/wire/subscribe_message.go @@ -70,6 +70,8 @@ func (g GroupOrder) String() string { } type SubscribeMessage struct { + TrackStatus bool + RequestID uint64 TrackNamespace Tuple TrackName []byte @@ -120,6 +122,9 @@ func (m *SubscribeMessage) LogValue() slog.Value { } func (m SubscribeMessage) Type() controlMessageType { + if m.TrackStatus { + return messageTypeTrackStatus + } return messageTypeSubscribe } diff --git a/internal/wire/subscribe_announces_error_message.go b/internal/wire/subscribe_namespace_error_message.go similarity index 70% rename from internal/wire/subscribe_announces_error_message.go rename to internal/wire/subscribe_namespace_error_message.go index 1be04c11..53fa1911 100644 --- a/internal/wire/subscribe_announces_error_message.go +++ b/internal/wire/subscribe_namespace_error_message.go @@ -7,31 +7,31 @@ import ( ) // TODO: Add tests -type SubscribeAnnouncesErrorMessage struct { +type SubscribeNamespaceErrorMessage struct { RequestID uint64 ErrorCode uint64 ReasonPhrase string } -func (m *SubscribeAnnouncesErrorMessage) LogValue() slog.Value { +func (m *SubscribeNamespaceErrorMessage) LogValue() slog.Value { return slog.GroupValue( - slog.String("type", "subscribe_announces_error"), + slog.String("type", "subscribe_namespace_error"), slog.Uint64("error_code", m.ErrorCode), slog.String("reason", m.ReasonPhrase), ) } -func (m SubscribeAnnouncesErrorMessage) Type() controlMessageType { +func (m SubscribeNamespaceErrorMessage) Type() controlMessageType { return messageTypeSubscribeNamespaceError } -func (m *SubscribeAnnouncesErrorMessage) Append(buf []byte) []byte { +func (m *SubscribeNamespaceErrorMessage) Append(buf []byte) []byte { buf = quicvarint.Append(buf, m.RequestID) buf = quicvarint.Append(buf, m.ErrorCode) return appendVarIntBytes(buf, []byte(m.ReasonPhrase)) } -func (m *SubscribeAnnouncesErrorMessage) parse(_ Version, data []byte) (err error) { +func (m *SubscribeNamespaceErrorMessage) parse(_ Version, data []byte) (err error) { var n int m.RequestID, n, err = quicvarint.Parse(data) if err != nil { diff --git a/internal/wire/subscribe_announces_message.go b/internal/wire/subscribe_namespace_message.go similarity index 73% rename from internal/wire/subscribe_announces_message.go rename to internal/wire/subscribe_namespace_message.go index 23883812..32be114f 100644 --- a/internal/wire/subscribe_announces_message.go +++ b/internal/wire/subscribe_namespace_message.go @@ -7,15 +7,15 @@ import ( ) // TODO: Add tests -type SubscribeAnnouncesMessage struct { +type SubscribeNamespaceMessage struct { RequestID uint64 TrackNamespacePrefix Tuple Parameters KVPList } -func (m *SubscribeAnnouncesMessage) LogValue() slog.Value { +func (m *SubscribeNamespaceMessage) LogValue() slog.Value { attrs := []slog.Attr{ - slog.String("type", "subscribe_announces"), + slog.String("type", "subscribe_namespace"), slog.Any("track_namespace_prefix", m.TrackNamespacePrefix), slog.Uint64("number_of_parameters", uint64(len(m.Parameters))), } @@ -27,17 +27,17 @@ func (m *SubscribeAnnouncesMessage) LogValue() slog.Value { return slog.GroupValue(attrs...) } -func (m SubscribeAnnouncesMessage) Type() controlMessageType { +func (m SubscribeNamespaceMessage) Type() controlMessageType { return messageTypeSubscribeNamespace } -func (m *SubscribeAnnouncesMessage) Append(buf []byte) []byte { +func (m *SubscribeNamespaceMessage) Append(buf []byte) []byte { buf = quicvarint.Append(buf, m.RequestID) buf = m.TrackNamespacePrefix.append(buf) return m.Parameters.appendNum(buf) } -func (m *SubscribeAnnouncesMessage) parse(_ Version, data []byte) (err error) { +func (m *SubscribeNamespaceMessage) parse(_ Version, data []byte) (err error) { var n int m.RequestID, n, err = quicvarint.Parse(data) if err != nil { diff --git a/internal/wire/subscribe_namespace_ok_message.go b/internal/wire/subscribe_namespace_ok_message.go new file mode 100644 index 00000000..568cf5c8 --- /dev/null +++ b/internal/wire/subscribe_namespace_ok_message.go @@ -0,0 +1,31 @@ +package wire + +import ( + "log/slog" + + "github.com/quic-go/quic-go/quicvarint" +) + +// TODO: Add tests +type SubscribeNamespaceOkMessage struct { + RequestID uint64 +} + +func (m *SubscribeNamespaceOkMessage) LogValue() slog.Value { + return slog.GroupValue( + slog.String("type", "subscribe_namespace_ok"), + ) +} + +func (m SubscribeNamespaceOkMessage) Type() controlMessageType { + return messageTypeSubscribeNamespaceOk +} + +func (m *SubscribeNamespaceOkMessage) Append(buf []byte) []byte { + return quicvarint.Append(buf, m.RequestID) +} + +func (m *SubscribeNamespaceOkMessage) parse(_ Version, data []byte) (err error) { + m.RequestID, _, err = quicvarint.Parse(data) + return err +} diff --git a/internal/wire/subscribe_ok_message.go b/internal/wire/subscribe_ok_message.go index 5fd6eb8a..38ef4a86 100644 --- a/internal/wire/subscribe_ok_message.go +++ b/internal/wire/subscribe_ok_message.go @@ -8,6 +8,8 @@ import ( ) type SubscribeOkMessage struct { + TrackStatus bool + RequestID uint64 TrackAlias uint64 Expires time.Duration @@ -49,6 +51,9 @@ func (m *SubscribeOkMessage) LogValue() slog.Value { } func (m SubscribeOkMessage) Type() controlMessageType { + if m.TrackStatus { + return messageTypeTrackStatusOk + } return messageTypeSubscribeOk } diff --git a/internal/wire/subscribe_update_message.go b/internal/wire/subscribe_update_message.go index 341085f2..edae6c4c 100644 --- a/internal/wire/subscribe_update_message.go +++ b/internal/wire/subscribe_update_message.go @@ -7,12 +7,13 @@ import ( ) type SubscribeUpdateMessage struct { - RequestID uint64 - StartLocation Location - EndGroup uint64 - SubscriberPriority uint8 - Forward uint8 - Parameters KVPList + RequestID uint64 + SubscriptionRequestID uint64 + StartLocation Location + EndGroup uint64 + SubscriberPriority uint8 + Forward uint8 + Parameters KVPList } func (m *SubscribeUpdateMessage) LogValue() slog.Value { @@ -39,6 +40,7 @@ func (m SubscribeUpdateMessage) Type() controlMessageType { func (m *SubscribeUpdateMessage) Append(buf []byte) []byte { buf = quicvarint.Append(buf, m.RequestID) + buf = quicvarint.Append(buf, m.SubscriptionRequestID) buf = m.StartLocation.append(buf) buf = quicvarint.Append(buf, m.EndGroup) buf = append(buf, m.SubscriberPriority) @@ -55,6 +57,12 @@ func (m *SubscribeUpdateMessage) parse(v Version, data []byte) (err error) { } data = data[n:] + m.SubscriptionRequestID, n, err = quicvarint.Parse(data) + if err != nil { + return err + } + data = data[n:] + n, err = m.StartLocation.parse(v, data) if err != nil { return err diff --git a/internal/wire/subscribe_update_message_test.go b/internal/wire/subscribe_update_message_test.go index 674ce856..ecd29264 100644 --- a/internal/wire/subscribe_update_message_test.go +++ b/internal/wire/subscribe_update_message_test.go @@ -16,7 +16,8 @@ func TestSubscribeUpdateMessageAppend(t *testing.T) { }{ { sum: SubscribeUpdateMessage{ - RequestID: 0, + RequestID: 0, + SubscriptionRequestID: 0, StartLocation: Location{ Group: 0, Object: 0, @@ -28,12 +29,13 @@ func TestSubscribeUpdateMessageAppend(t *testing.T) { }, buf: []byte{}, expect: []byte{ - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, }, }, { sum: SubscribeUpdateMessage{ - RequestID: 1, + RequestID: 1, + SubscriptionRequestID: 9, StartLocation: Location{ Group: 2, Object: 3, @@ -44,11 +46,12 @@ func TestSubscribeUpdateMessageAppend(t *testing.T) { Parameters: KVPList{KeyValuePair{Type: PathParameterKey, ValueBytes: []byte("A")}}, }, buf: []byte{}, - expect: []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x01, 0x01, 0x01, 0x01, 'A'}, + expect: []byte{0x01, 0x09, 0x02, 0x03, 0x04, 0x05, 0x01, 0x01, 0x01, 0x01, 'A'}, }, { sum: SubscribeUpdateMessage{ - RequestID: 1, + RequestID: 1, + SubscriptionRequestID: 9, StartLocation: Location{ Group: 2, Object: 3, @@ -59,7 +62,7 @@ func TestSubscribeUpdateMessageAppend(t *testing.T) { Parameters: KVPList{KeyValuePair{Type: PathParameterKey, ValueBytes: []byte("A")}}, }, buf: []byte{0x0a, 0x0b}, - expect: []byte{0x0a, 0x0b, 0x01, 0x02, 0x03, 0x04, 0x05, 0x01, 0x01, 0x01, 0x01, 'A'}, + expect: []byte{0x0a, 0x0b, 0x01, 0x09, 0x02, 0x03, 0x04, 0x05, 0x01, 0x01, 0x01, 0x01, 'A'}, }, } for i, tc := range cases { @@ -87,9 +90,10 @@ func TestParseSubscribeUpdateMessage(t *testing.T) { err: io.EOF, }, { - data: []byte{0x00, 0x01, 0x02}, + data: []byte{0x00, 0x09, 0x01, 0x02}, expect: &SubscribeUpdateMessage{ - RequestID: 0, + RequestID: 0, + SubscriptionRequestID: 9, StartLocation: Location{ Group: 1, Object: 2, @@ -102,9 +106,10 @@ func TestParseSubscribeUpdateMessage(t *testing.T) { err: io.EOF, }, { - data: []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x01, 0x01, 0x01, 0x01, 'P'}, + data: []byte{0x01, 0x09, 0x02, 0x03, 0x04, 0x05, 0x01, 0x01, 0x01, 0x01, 'P'}, expect: &SubscribeUpdateMessage{ - RequestID: 1, + RequestID: 1, + SubscriptionRequestID: 9, StartLocation: Location{ Group: 2, Object: 3, diff --git a/internal/wire/track_status_message.go b/internal/wire/track_status_message.go deleted file mode 100644 index 489d5c7c..00000000 --- a/internal/wire/track_status_message.go +++ /dev/null @@ -1,58 +0,0 @@ -package wire - -import ( - "log/slog" - - "github.com/quic-go/quic-go/quicvarint" -) - -type TrackStatusMessage struct { - RequestID uint64 - StatusCode uint64 - LargestLocation Location - Parameters KVPList -} - -func (m *TrackStatusMessage) LogValue() slog.Value { - return slog.GroupValue( - slog.String("type", "track_status"), - slog.Uint64("status_code", m.StatusCode), - slog.Uint64("last_group_id", m.LargestLocation.Group), - slog.Uint64("last_object_id", m.LargestLocation.Object), - ) -} - -func (m TrackStatusMessage) Type() controlMessageType { - return messageTypeTrackStatusOk -} - -func (m *TrackStatusMessage) Append(buf []byte) []byte { - buf = quicvarint.Append(buf, m.RequestID) - buf = quicvarint.Append(buf, m.StatusCode) - buf = m.LargestLocation.append(buf) - return m.Parameters.appendNum(buf) -} - -func (m *TrackStatusMessage) parse(v Version, data []byte) (err error) { - var n int - m.RequestID, n, err = quicvarint.Parse(data) - if err != nil { - return - } - data = data[n:] - - m.StatusCode, n, err = quicvarint.Parse(data) - if err != nil { - return - } - data = data[n:] - - n, err = m.LargestLocation.parse(v, data) - if err != nil { - return - } - data = data[n:] - - m.Parameters = KVPList{} - return m.Parameters.parseNum(data) -} diff --git a/internal/wire/track_status_message_test.go b/internal/wire/track_status_message_test.go deleted file mode 100644 index 4cf6a578..00000000 --- a/internal/wire/track_status_message_test.go +++ /dev/null @@ -1,93 +0,0 @@ -package wire - -import ( - "fmt" - "io" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestTrackStatusMessageAppend(t *testing.T) { - cases := []struct { - tsm TrackStatusMessage - buf []byte - expect []byte - }{ - { - tsm: TrackStatusMessage{ - RequestID: 0, - StatusCode: 0, - LargestLocation: Location{ - Group: 0, - Object: 0, - }, - }, - buf: []byte{}, - expect: []byte{0x00, 0x00, 0x00, 0x00, 0x00}, - }, - { - tsm: TrackStatusMessage{ - RequestID: 1, - StatusCode: 2, - LargestLocation: Location{ - Group: 1, - Object: 2, - }, - Parameters: KVPList{}, - }, - buf: []byte{0x0a, 0x0b}, - expect: []byte{0x0a, 0x0b, 0x01, 0x02, 0x01, 0x02, 0x00}, - }, - } - for i, tc := range cases { - t.Run(fmt.Sprintf("%v", i), func(t *testing.T) { - res := tc.tsm.Append(tc.buf) - assert.Equal(t, tc.expect, res) - }) - } -} - -func TestParseTrackStatusMessage(t *testing.T) { - cases := []struct { - data []byte - expect *TrackStatusMessage - err error - }{ - { - data: nil, - expect: &TrackStatusMessage{}, - err: io.EOF, - }, - { - data: []byte{}, - expect: &TrackStatusMessage{}, - err: io.EOF, - }, - { - data: []byte{0x01, 0x02, 0x03, 0x04, 0x00}, - expect: &TrackStatusMessage{ - RequestID: 1, - StatusCode: 2, - LargestLocation: Location{ - Group: 3, - Object: 4, - }, - Parameters: KVPList{}, - }, - err: nil, - }, - } - for i, tc := range cases { - t.Run(fmt.Sprintf("%v", i), func(t *testing.T) { - res := &TrackStatusMessage{} - err := res.parse(CurrentVersion, tc.data) - assert.Equal(t, tc.expect, res) - if tc.err != nil { - assert.Equal(t, tc.err, err) - } else { - assert.NoError(t, err) - } - }) - } -} diff --git a/internal/wire/track_status_request_message.go b/internal/wire/track_status_request_message.go deleted file mode 100644 index 08d8bfc5..00000000 --- a/internal/wire/track_status_request_message.go +++ /dev/null @@ -1,62 +0,0 @@ -package wire - -import ( - "log/slog" - - "github.com/mengelbart/qlog" - "github.com/quic-go/quic-go/quicvarint" -) - -type TrackStatusRequestMessage struct { - RequestID uint64 - TrackNamespace Tuple - TrackName []byte - Parameters KVPList -} - -func (m *TrackStatusRequestMessage) LogValue() slog.Value { - return slog.GroupValue( - slog.String("type", "track_status_request"), - slog.Any("track_namespace", m.TrackNamespace), - slog.Any("track_name", qlog.RawInfo{ - Length: uint64(len(m.TrackName)), - PayloadLength: uint64(len(m.TrackName)), - Data: []byte(m.TrackName), - }), - ) -} - -func (m TrackStatusRequestMessage) Type() controlMessageType { - return messageTypeTrackStatus -} - -func (m *TrackStatusRequestMessage) Append(buf []byte) []byte { - buf = quicvarint.Append(buf, m.RequestID) - buf = m.TrackNamespace.append(buf) - buf = appendVarIntBytes(buf, []byte(m.TrackName)) - return m.Parameters.appendNum(buf) -} - -func (m *TrackStatusRequestMessage) parse(_ Version, data []byte) (err error) { - var n int - m.RequestID, n, err = quicvarint.Parse(data) - if err != nil { - return - } - data = data[n:] - - m.TrackNamespace, n, err = parseTuple(data) - if err != nil { - return - } - data = data[n:] - - m.TrackName, n, err = parseVarIntBytes(data) - if err != nil { - return err - } - data = data[n:] - - m.Parameters = KVPList{} - return m.Parameters.parseNum(data) -} diff --git a/internal/wire/track_status_request_message_test.go b/internal/wire/track_status_request_message_test.go deleted file mode 100644 index 34cedab7..00000000 --- a/internal/wire/track_status_request_message_test.go +++ /dev/null @@ -1,92 +0,0 @@ -package wire - -import ( - "fmt" - "io" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestTrackStatusRequestMessageAppend(t *testing.T) { - cases := []struct { - aom TrackStatusRequestMessage - buf []byte - expect []byte - }{ - { - aom: TrackStatusRequestMessage{ - RequestID: 0, - TrackNamespace: []string{""}, - TrackName: []byte(""), - Parameters: KVPList{}, - }, - buf: []byte{}, - expect: []byte{ - 0x00, 0x01, 0x00, 0x00, 0x00, - }, - }, - { - aom: TrackStatusRequestMessage{ - RequestID: 0, - TrackNamespace: []string{"tracknamespace"}, - TrackName: []byte("track"), - Parameters: KVPList{}, - }, - buf: []byte{0x0a, 0x0b}, - expect: []byte{0x0a, 0x0b, 0x00, 0x01, 0x0e, 't', 'r', 'a', 'c', 'k', 'n', 'a', 'm', 'e', 's', 'p', 'a', 'c', 'e', 0x05, 't', 'r', 'a', 'c', 'k', 0x00}, - }, - } - for i, tc := range cases { - t.Run(fmt.Sprintf("%v", i), func(t *testing.T) { - res := tc.aom.Append(tc.buf) - assert.Equal(t, tc.expect, res) - }) - } -} - -func TestParseTrackStatusRequestMessage(t *testing.T) { - cases := []struct { - data []byte - expect *TrackStatusRequestMessage - err error - }{ - { - data: nil, - expect: &TrackStatusRequestMessage{}, - err: io.EOF, - }, - { - data: []byte{0x00, 0x01, 0x0e, 't', 'r', 'a', 'c', 'k', 'n', 'a', 'm', 'e', 's', 'p', 'a', 'c', 'e', 0x05, 't', 'r', 'a', 'c', 'k', 0x00}, - expect: &TrackStatusRequestMessage{ - RequestID: 0, - TrackNamespace: []string{"tracknamespace"}, - TrackName: []byte("track"), - Parameters: KVPList{}, - }, - err: nil, - }, - { - data: append([]byte{0x00, 0x10}, append([]byte("tracknamespace"), 0x00)...), - expect: &TrackStatusRequestMessage{ - RequestID: 0, - TrackNamespace: []string{}, - TrackName: nil, - Parameters: nil, - }, - err: errLengthMismatch, - }, - } - for i, tc := range cases { - t.Run(fmt.Sprintf("%v", i), func(t *testing.T) { - res := &TrackStatusRequestMessage{} - err := res.parse(CurrentVersion, tc.data) - assert.Equal(t, tc.expect, res) - if tc.err != nil { - assert.Equal(t, tc.err, err) - } else { - assert.NoError(t, err) - } - }) - } -} diff --git a/internal/wire/unannounce_message.go b/internal/wire/unannounce_message.go deleted file mode 100644 index 232b7ea8..00000000 --- a/internal/wire/unannounce_message.go +++ /dev/null @@ -1,30 +0,0 @@ -package wire - -import ( - "log/slog" -) - -type UnannounceMessage struct { - TrackNamespace Tuple -} - -func (m *UnannounceMessage) LogValue() slog.Value { - return slog.GroupValue( - slog.String("type", "unannounce"), - slog.Any("track_namespace", m.TrackNamespace), - ) -} - -func (m UnannounceMessage) Type() controlMessageType { - return messageTypeUnannounce -} - -func (m *UnannounceMessage) Append(buf []byte) []byte { - buf = m.TrackNamespace.append(buf) - return buf -} - -func (p *UnannounceMessage) parse(_ Version, data []byte) (err error) { - p.TrackNamespace, _, err = parseTuple(data) - return err -} diff --git a/internal/wire/unsubscribe_announces_message.go b/internal/wire/unsubscribe_namespace_message.go similarity index 50% rename from internal/wire/unsubscribe_announces_message.go rename to internal/wire/unsubscribe_namespace_message.go index 634224db..9dd0669b 100644 --- a/internal/wire/unsubscribe_announces_message.go +++ b/internal/wire/unsubscribe_namespace_message.go @@ -5,26 +5,26 @@ import ( ) // TODO: Add tests -type UnsubscribeAnnouncesMessage struct { +type UnsubscribeNamespaceMessage struct { TrackNamespacePrefix Tuple } -func (m *UnsubscribeAnnouncesMessage) LogValue() slog.Value { +func (m *UnsubscribeNamespaceMessage) LogValue() slog.Value { return slog.GroupValue( - slog.String("type", "unsubscribe_announces"), + slog.String("type", "unsubscribe_namespace"), slog.Any("track_namespace_prefix", m.TrackNamespacePrefix), ) } -func (m UnsubscribeAnnouncesMessage) Type() controlMessageType { +func (m UnsubscribeNamespaceMessage) Type() controlMessageType { return messageTypeUnsubscribeNamespace } -func (m *UnsubscribeAnnouncesMessage) Append(buf []byte) []byte { +func (m *UnsubscribeNamespaceMessage) Append(buf []byte) []byte { return m.TrackNamespacePrefix.append(buf) } -func (m *UnsubscribeAnnouncesMessage) parse(_ Version, data []byte) (err error) { +func (m *UnsubscribeNamespaceMessage) parse(_ Version, data []byte) (err error) { m.TrackNamespacePrefix, _, err = parseTuple(data) return err } diff --git a/messages.go b/messages.go index 438cd2c7..42b83904 100644 --- a/messages.go +++ b/messages.go @@ -131,6 +131,8 @@ func (kvpl KVPList) GetAuthorizationToken() ([]byte, bool) { // SubscribeOptions contains options for subscribing to a track with full control // over all subscribe message parameters. type SubscribeOptions struct { + TrackStatus bool + // SubscriberPriority indicates the delivery priority (0-255, higher is more important) SubscriberPriority uint8 diff --git a/session.go b/session.go index ec950ee8..deba3cc8 100644 --- a/session.go +++ b/session.go @@ -26,7 +26,6 @@ var ( errUnknownTrackAlias = errors.New("unknown track alias") errMissingPathParameter = errors.New("missing path parameter") errUnexpectedPathParameter = errors.New("unexpected path parameter on QUIC connection") - errUnknownTrackStatusRequest = errors.New("got unexpected track status requrest") ) type controlMessageStream interface { @@ -85,8 +84,6 @@ type Session struct { trackAliases *sequence remoteTracks *remoteTrackMap localTracks *localTrackMap - - outgoingTrackStatusRequests *trackStatusRequestMap } func (s *Session) Run(conn Connection) error { @@ -119,7 +116,6 @@ func (s *Session) Run(conn Connection) error { s.trackAliases = newSequence(0, 1) s.remoteTracks = newRemoteTrackMap() s.localTracks = newLocalTrackMap() - s.outgoingTrackStatusRequests = newTrackStatusRequestMap() s.controlStream = &controlStream{ stream: cs, logger: defaultLogger.With("perspective", conn.Perspective()), @@ -401,6 +397,12 @@ func WithSubscribeParameters(parameters KVPList) SubscribeOption { } } +func TrackStatus() SubscribeOption { + return func(so *SubscribeOptions) { + so.TrackStatus = true + } +} + // SubscribeUpdateOption is a functional option for configuring SUBSCRIBE_UPDATE requests. type SubscribeUpdateOption func(*SubscribeUpdateOptions) @@ -520,6 +522,7 @@ func (s *Session) Subscribe( } cm := &wire.SubscribeMessage{ + TrackStatus: opts.TrackStatus, RequestID: requestID, TrackNamespace: namespace, TrackName: []byte(name), @@ -649,7 +652,7 @@ func (s *Session) subscriptionDone(id, code, count uint64, reason string) error if !ok { return errUnknownRequestID } - return s.controlStream.write(&wire.SubscribeDoneMessage{ + return s.controlStream.write(&wire.PublishDoneMessage{ RequestID: lt.requestID, StatusCode: code, StreamCount: count, @@ -755,44 +758,6 @@ func (s *Session) fetchCancel(id uint64) error { }) } -func (s *Session) RequestTrackStatus(ctx context.Context, namespace []string, track string) (*TrackStatus, error) { - requestID, err := s.getRequestID() - if err != nil { - return nil, err - } - tsr := &trackStatusRequest{ - requestID: requestID, - namespace: namespace, - trackname: track, - response: make(chan *TrackStatus, 1), - } - - s.outgoingTrackStatusRequests.add(tsr) - tsrm := &wire.TrackStatusRequestMessage{ - TrackNamespace: namespace, - TrackName: []byte(track), - } - if err := s.controlStream.write(tsrm); err != nil { - _, _ = s.outgoingTrackStatusRequests.delete(tsrm.RequestID) - return nil, err - } - select { - case <-ctx.Done(): - return nil, context.Cause(ctx) - case status := <-tsr.response: - return status, nil - } -} - -func (s *Session) sendTrackStatus(ts TrackStatus) error { - return s.controlStream.write(&wire.TrackStatusMessage{ - StatusCode: ts.StatusCode, - RequestID: 0, - LargestLocation: wire.Location{}, - Parameters: wire.KVPList{}, - }) -} - // Announce announces namespace to the peer. It blocks until a response from the // peer was received or ctx is cancelled and returns an error if the // announcement was rejected. @@ -808,7 +773,7 @@ func (s *Session) Announce(ctx context.Context, namespace []string) error { response: make(chan error, 1), } s.outgoingAnnouncements.add(a) - am := &wire.AnnounceMessage{ + am := &wire.PublishNamespaceMessage{ RequestID: a.requestID, TrackNamespace: a.namespace, Parameters: a.parameters, @@ -829,7 +794,7 @@ func (s *Session) acceptAnnouncement(requestID uint64) error { if _, err := s.incomingAnnouncements.confirmAndGet(requestID); err != nil { return err } - if err := s.controlStream.write(&wire.AnnounceOkMessage{ + if err := s.controlStream.write(&wire.PublishNamespaceOkMessage{ RequestID: requestID, }); err != nil { return err @@ -838,7 +803,7 @@ func (s *Session) acceptAnnouncement(requestID uint64) error { } func (s *Session) rejectAnnouncement(requestID uint64, c uint64, r string) error { - return s.controlStream.write(&wire.AnnounceErrorMessage{ + return s.controlStream.write(&wire.PublishNamespaceErrorMessage{ RequestID: requestID, ErrorCode: c, ReasonPhrase: r, @@ -849,7 +814,7 @@ func (s *Session) Unannounce(ctx context.Context, namespace []string) error { if ok := s.outgoingAnnouncements.delete(namespace); ok { return errUnknownAnnouncementNamespace } - u := &wire.UnannounceMessage{ + u := &wire.PublishNamespaceDoneMessage{ TrackNamespace: namespace, } return s.controlStream.write(u) @@ -859,7 +824,7 @@ func (s *Session) AnnounceCancel(ctx context.Context, namespace []string, errorC if !s.incomingAnnouncements.delete(namespace) { return errUnknownAnnouncementNamespace } - acm := &wire.AnnounceCancelMessage{ + acm := &wire.PublishNamespaceCancelMessage{ TrackNamespace: namespace, ErrorCode: errorCode, ReasonPhrase: reason, @@ -880,7 +845,7 @@ func (s *Session) SubscribeAnnouncements(ctx context.Context, prefix []string) e response: make(chan announcementSubscriptionResponse, 1), } s.pendingOutgointAnnouncementSubscriptions.add(as) - sam := &wire.SubscribeAnnouncesMessage{ + sam := &wire.SubscribeNamespaceMessage{ RequestID: as.requestID, TrackNamespacePrefix: as.namespace, Parameters: wire.KVPList{}, @@ -898,13 +863,13 @@ func (s *Session) SubscribeAnnouncements(ctx context.Context, prefix []string) e } func (s *Session) acceptAnnouncementSubscription(requestID uint64) error { - return s.controlStream.write(&wire.SubscribeAnnouncesOkMessage{ + return s.controlStream.write(&wire.SubscribeNamespaceOkMessage{ RequestID: requestID, }) } func (s *Session) rejectAnnouncementSubscription(requestID uint64, c uint64, r string) error { - return s.controlStream.write(&wire.SubscribeAnnouncesErrorMessage{ + return s.controlStream.write(&wire.SubscribeNamespaceErrorMessage{ RequestID: requestID, ErrorCode: c, ReasonPhrase: r, @@ -913,7 +878,7 @@ func (s *Session) rejectAnnouncementSubscription(requestID uint64, c uint64, r s func (s *Session) UnsubscribeAnnouncements(ctx context.Context, namespace []string) error { s.pendingOutgointAnnouncementSubscriptions.delete(namespace) - uam := &wire.UnsubscribeAnnouncesMessage{ + uam := &wire.UnsubscribeNamespaceMessage{ TrackNamespacePrefix: namespace, } return s.controlStream.write(uam) @@ -936,12 +901,15 @@ func (s *Session) receive(msg wire.ControlMessage) error { var err error switch m := msg.(type) { + case *wire.GoAwayMessage: s.onGoAway(m) + case *wire.MaxRequestIDMessage: err = s.onMaxRequestID(m) case *wire.RequestsBlockedMessage: err = s.onRequestsBlocked(m) + case *wire.SubscribeMessage: err = s.onSubscribe(m) case *wire.SubscribeOkMessage: @@ -952,8 +920,16 @@ func (s *Session) receive(msg wire.ControlMessage) error { err = s.onSubscribeUpdate(m) case *wire.UnsubscribeMessage: err = s.onUnsubscribe(m) - case *wire.SubscribeDoneMessage: + + case *wire.PublishDoneMessage: err = s.onSubscribeDone(m) + case *wire.PublishMessage: + panic("TODO") + case *wire.PublishOkMessage: + panic("TODO") + case *wire.PublishErrorMessage: + panic("TODO") + case *wire.FetchMessage: err = s.onFetch(m) case *wire.FetchOkMessage: @@ -962,28 +938,27 @@ func (s *Session) receive(msg wire.ControlMessage) error { err = s.onFetchError(m) case *wire.FetchCancelMessage: err = s.onFetchCancel(m) - case *wire.TrackStatusRequestMessage: - err = s.onTrackStatusRequest(m) - case *wire.TrackStatusMessage: - err = s.onTrackStatus(m) - case *wire.AnnounceMessage: + + case *wire.PublishNamespaceMessage: err = s.onAnnounce(m) - case *wire.AnnounceOkMessage: + case *wire.PublishNamespaceOkMessage: err = s.onAnnounceOk(m) - case *wire.AnnounceErrorMessage: + case *wire.PublishNamespaceErrorMessage: err = s.onAnnounceError(m) - case *wire.UnannounceMessage: + case *wire.PublishNamespaceDoneMessage: err = s.onUnannounce(m) - case *wire.AnnounceCancelMessage: + case *wire.PublishNamespaceCancelMessage: err = s.onAnnounceCancel(m) - case *wire.SubscribeAnnouncesMessage: + + case *wire.SubscribeNamespaceMessage: err = s.onSubscribeAnnounces(m) - case *wire.SubscribeAnnouncesOkMessage: + case *wire.SubscribeNamespaceOkMessage: err = s.onSubscribeAnnouncesOk(m) - case *wire.SubscribeAnnouncesErrorMessage: + case *wire.SubscribeNamespaceErrorMessage: err = s.onSubscribeAnnouncesError(m) - case *wire.UnsubscribeAnnouncesMessage: + case *wire.UnsubscribeNamespaceMessage: s.onUnsubscribeAnnounces(m) + default: err = errUnexpectedMessageType } @@ -1092,26 +1067,32 @@ func (s *Session) onSubscribe(msg *wire.SubscribeMessage) error { EndGroup: nil, Parameters: FromWire(msg.Parameters), } - lt := newLocalTrack(s.conn, m.RequestID, s.trackAliases.next(), func(code, count uint64, reason string) error { - return s.subscriptionDone(m.RequestID, code, count, reason) - }, s.Qlogger) - - if err := s.addLocalTrack(lt); err != nil { - code := ErrorCodeInternal - reason := "internal" - if err == errMaxRequestIDViolated { - code = ErrorCodeTooManyRequests - reason = "too many subscribes" + var lt *localTrack + trackAlias := uint64(0) + if !msg.TrackStatus { + trackAlias = s.trackAliases.next() + lt = newLocalTrack(s.conn, m.RequestID, trackAlias, func(code, count uint64, reason string) error { + return s.subscriptionDone(m.RequestID, code, count, reason) + }, s.Qlogger) + + if err := s.addLocalTrack(lt); err != nil { + code := ErrorCodeInternal + reason := "internal" + if err == errMaxRequestIDViolated { + code = ErrorCodeTooManyRequests + reason = "too many subscribes" + } + return s.controlStream.write(&wire.SubscribeErrorMessage{ + RequestID: lt.requestID, + ErrorCode: uint64(code), + ReasonPhrase: reason, + }) } - return s.controlStream.write(&wire.SubscribeErrorMessage{ - RequestID: lt.requestID, - ErrorCode: uint64(code), - ReasonPhrase: reason, - }) } + srw := &SubscribeResponseWriter{ id: m.RequestID, - trackAlias: lt.trackAlias, + trackAlias: trackAlias, session: s, localTrack: lt, handled: false, @@ -1218,7 +1199,7 @@ func (s *Session) onUnsubscribe(msg *wire.UnsubscribeMessage) error { return nil } -func (s *Session) onSubscribeDone(msg *wire.SubscribeDoneMessage) error { +func (s *Session) onSubscribeDone(msg *wire.PublishDoneMessage) error { sub, ok := s.remoteTracks.findByRequestID(msg.RequestID) if !ok { return errUnknownRequestID @@ -1304,52 +1285,7 @@ func (s *Session) onFetchCancel(msg *wire.FetchCancelMessage) error { return nil } -func (s *Session) onTrackStatusRequest(msg *wire.TrackStatusRequestMessage) error { - if len(msg.TrackNamespace) == 0 || len(msg.TrackNamespace) > 32 { - return errInvalidNamespaceLength - } - tsrw := &trackStatusResponseWriter{ - session: s, - handled: false, - status: TrackStatus{ - Namespace: msg.TrackNamespace, - Trackname: string(msg.TrackName), - StatusCode: 0, - LastGroupID: 0, - LastObjectID: 0, - }, - } - s.Handler.Handle(tsrw, &Message{ - Method: MessageTrackStatusRequest, - Namespace: msg.TrackNamespace, - Track: string(msg.TrackName), - }) - if !tsrw.handled { - return tsrw.Reject(0, "") - } - return nil -} - -func (s *Session) onTrackStatus(msg *wire.TrackStatusMessage) error { - tsr, ok := s.outgoingTrackStatusRequests.delete(msg.RequestID) - if !ok { - return errUnknownTrackStatusRequest - } - select { - case tsr.response <- &TrackStatus{ - Namespace: tsr.namespace, - Trackname: tsr.trackname, - StatusCode: msg.StatusCode, - LastGroupID: msg.LargestLocation.Group, - LastObjectID: msg.LargestLocation.Object, - }: - default: - s.logger.Info("dropping unhandled track status") - } - return nil -} - -func (s *Session) onAnnounce(msg *wire.AnnounceMessage) error { +func (s *Session) onAnnounce(msg *wire.PublishNamespaceMessage) error { if len(msg.TrackNamespace) == 0 || len(msg.TrackNamespace) > 32 { return errInvalidNamespaceLength } @@ -1377,7 +1313,7 @@ func (s *Session) onAnnounce(msg *wire.AnnounceMessage) error { return nil } -func (s *Session) onAnnounceOk(msg *wire.AnnounceOkMessage) error { +func (s *Session) onAnnounceOk(msg *wire.PublishNamespaceOkMessage) error { announcement, err := s.outgoingAnnouncements.confirmAndGet(msg.RequestID) if err != nil { return errUnknownAnnouncement @@ -1390,7 +1326,7 @@ func (s *Session) onAnnounceOk(msg *wire.AnnounceOkMessage) error { return nil } -func (s *Session) onAnnounceError(msg *wire.AnnounceErrorMessage) error { +func (s *Session) onAnnounceError(msg *wire.PublishNamespaceErrorMessage) error { announcement, ok := s.outgoingAnnouncements.reject(msg.RequestID) if !ok { return errUnknownAnnouncement @@ -1406,7 +1342,7 @@ func (s *Session) onAnnounceError(msg *wire.AnnounceErrorMessage) error { return nil } -func (s *Session) onUnannounce(msg *wire.UnannounceMessage) error { +func (s *Session) onUnannounce(msg *wire.PublishNamespaceDoneMessage) error { if len(msg.TrackNamespace) == 0 || len(msg.TrackNamespace) > 32 { return errInvalidNamespaceLength } @@ -1420,7 +1356,7 @@ func (s *Session) onUnannounce(msg *wire.UnannounceMessage) error { return nil } -func (s *Session) onAnnounceCancel(msg *wire.AnnounceCancelMessage) error { +func (s *Session) onAnnounceCancel(msg *wire.PublishNamespaceCancelMessage) error { if len(msg.TrackNamespace) == 0 || len(msg.TrackNamespace) > 32 { return errInvalidNamespaceLength } @@ -1433,7 +1369,7 @@ func (s *Session) onAnnounceCancel(msg *wire.AnnounceCancelMessage) error { return nil } -func (s *Session) onSubscribeAnnounces(msg *wire.SubscribeAnnouncesMessage) error { +func (s *Session) onSubscribeAnnounces(msg *wire.SubscribeNamespaceMessage) error { s.pendingIncomingAnnouncementSubscriptions.add(&announcementSubscription{ requestID: msg.RequestID, namespace: msg.TrackNamespacePrefix, @@ -1455,7 +1391,7 @@ func (s *Session) onSubscribeAnnounces(msg *wire.SubscribeAnnouncesMessage) erro return nil } -func (s *Session) onSubscribeAnnouncesOk(msg *wire.SubscribeAnnouncesOkMessage) error { +func (s *Session) onSubscribeAnnouncesOk(msg *wire.SubscribeNamespaceOkMessage) error { as, ok := s.pendingOutgointAnnouncementSubscriptions.deleteByID(msg.RequestID) if !ok { return errUnknownSubscribeAnnouncesPrefix @@ -1470,7 +1406,7 @@ func (s *Session) onSubscribeAnnouncesOk(msg *wire.SubscribeAnnouncesOkMessage) return nil } -func (s *Session) onSubscribeAnnouncesError(msg *wire.SubscribeAnnouncesErrorMessage) error { +func (s *Session) onSubscribeAnnouncesError(msg *wire.SubscribeNamespaceErrorMessage) error { as, ok := s.pendingOutgointAnnouncementSubscriptions.deleteByID(msg.RequestID) if !ok { return errUnknownSubscribeAnnouncesPrefix @@ -1488,7 +1424,7 @@ func (s *Session) onSubscribeAnnouncesError(msg *wire.SubscribeAnnouncesErrorMes return nil } -func (s *Session) onUnsubscribeAnnounces(msg *wire.UnsubscribeAnnouncesMessage) { +func (s *Session) onUnsubscribeAnnounces(msg *wire.UnsubscribeNamespaceMessage) { s.Handler.Handle(nil, &Message{ Method: MessageUnsubscribeAnnounces, Namespace: msg.TrackNamespacePrefix, diff --git a/session_test.go b/session_test.go index 07196d4e..2ffe2f85 100644 --- a/session_test.go +++ b/session_test.go @@ -41,7 +41,6 @@ func newSessionWithHandlers(conn Connection, cs controlMessageStream, h Handler, highestRequestsBlocked: atomic.Uint64{}, remoteTracks: newRemoteTrackMap(), localTracks: newLocalTrackMap(), - outgoingTrackStatusRequests: newTrackStatusRequestMap(), localMaxRequestID: atomic.Uint64{}, trackAliases: newSequence(0, 1), } @@ -371,12 +370,12 @@ func TestSession(t *testing.T) { s := newSession(conn, cs, nil) s.handshakeDone.Store(true) - cs.EXPECT().write(&wire.AnnounceMessage{ + cs.EXPECT().write(&wire.PublishNamespaceMessage{ RequestID: 0, TrackNamespace: []string{"namespace"}, Parameters: wire.KVPList{}, }).DoAndReturn(func(_ wire.ControlMessage) error { - err := s.receive(&wire.AnnounceOkMessage{ + err := s.receive(&wire.PublishNamespaceOkMessage{ RequestID: 0, }) assert.NoError(t, err) @@ -404,10 +403,10 @@ func TestSession(t *testing.T) { }).DoAndReturn(func(rw ResponseWriter, req *Message) { assert.NoError(t, rw.Accept()) }) - cs.EXPECT().write(&wire.AnnounceOkMessage{ + cs.EXPECT().write(&wire.PublishNamespaceOkMessage{ RequestID: 2, }) - err := s.receive(&wire.AnnounceMessage{ + err := s.receive(&wire.PublishNamespaceMessage{ RequestID: 2, TrackNamespace: []string{"namespace"}, Parameters: wire.KVPList{}, diff --git a/track_status_request.go b/track_status_request.go deleted file mode 100644 index 83ade0b4..00000000 --- a/track_status_request.go +++ /dev/null @@ -1,17 +0,0 @@ -package moqtransport - -type TrackStatus struct { - Namespace []string - Trackname string - StatusCode uint64 - LastGroupID uint64 - LastObjectID uint64 -} - -type trackStatusRequest struct { - requestID uint64 - namespace []string - trackname string - - response chan *TrackStatus -} diff --git a/track_status_request_map.go b/track_status_request_map.go deleted file mode 100644 index 525e1002..00000000 --- a/track_status_request_map.go +++ /dev/null @@ -1,30 +0,0 @@ -package moqtransport - -import ( - "sync" -) - -type trackStatusRequestMap struct { - lock sync.Mutex - requests map[uint64]*trackStatusRequest -} - -func newTrackStatusRequestMap() *trackStatusRequestMap { - return &trackStatusRequestMap{ - lock: sync.Mutex{}, - requests: map[uint64]*trackStatusRequest{}, - } -} - -func (m *trackStatusRequestMap) add(tsr *trackStatusRequest) { - m.lock.Lock() - defer m.lock.Unlock() - m.requests[tsr.requestID] = tsr -} - -func (m *trackStatusRequestMap) delete(requestID uint64) (*trackStatusRequest, bool) { - m.lock.Lock() - defer m.lock.Unlock() - tsr, ok := m.requests[requestID] - return tsr, ok -} diff --git a/track_status_response_writer.go b/track_status_response_writer.go deleted file mode 100644 index fe4bde25..00000000 --- a/track_status_response_writer.go +++ /dev/null @@ -1,29 +0,0 @@ -package moqtransport - -type trackStatusResponseWriter struct { - session *Session - handled bool - status TrackStatus -} - -// Accept commits the status and sends a response to the peer. -func (w *trackStatusResponseWriter) Accept() error { - w.handled = true - return w.session.sendTrackStatus(w.status) -} - -// Reject sends a track does not exist status -func (w *trackStatusResponseWriter) Reject(uint64, string) error { - w.handled = true - w.status.StatusCode = TrackStatusDoesNotExist - w.status.LastGroupID = 0 - w.status.LastObjectID = 0 - return w.Accept() -} - -// SetStatus implements StatusRequestHandler. -func (w *trackStatusResponseWriter) SetStatus(statusCode uint64, lastGroupID uint64, lastObjectID uint64) { - w.status.StatusCode = statusCode - w.status.LastGroupID = lastGroupID - w.status.LastObjectID = lastObjectID -}