From 31744d452273d5b970597d69e37e51f28b7385da Mon Sep 17 00:00:00 2001 From: Mathis Engelbart Date: Wed, 17 Sep 2025 11:47:29 +0200 Subject: [PATCH] Rename and update messages to draft-14 --- internal/wire/announce_ok_message.go | 30 ---------- internal/wire/control_message_parser.go | 32 +++++----- internal/wire/control_message_type.go | 36 +++++------ ...one_message.go => publish_done_message.go} | 14 ++--- ...e_test.go => publish_done_message_test.go} | 30 +++++----- internal/wire/publish_error_message.go | 2 +- ...go => publish_namespace_cancel_message.go} | 16 ++--- ... publish_namespace_cancel_message_test.go} | 18 +++--- .../wire/publish_namespace_done_message.go | 30 ++++++++++ ...=> publish_namespace_done_message_test.go} | 18 +++--- ....go => publish_namespace_error_message.go} | 14 ++--- ...> publish_namespace_error_message_test.go} | 18 +++--- ...essage.go => publish_namespace_message.go} | 16 ++--- ...t.go => publish_namespace_message_test.go} | 16 ++--- internal/wire/publish_namespace_ok_message.go | 30 ++++++++++ ...o => publish_namespace_ok_message_test.go} | 18 +++--- .../wire/subscribe_announces_ok_message.go | 31 ---------- ...o => subscribe_namespace_error_message.go} | 12 ++-- ...sage.go => subscribe_namespace_message.go} | 12 ++-- .../wire/subscribe_namespace_ok_message.go | 31 ++++++++++ internal/wire/subscribe_update_message.go | 20 +++++-- .../wire/subscribe_update_message_test.go | 25 ++++---- internal/wire/unannounce_message.go | 30 ---------- ...ge.go => unsubscribe_namespace_message.go} | 12 ++-- session.go | 60 +++++++++---------- session_test.go | 8 +-- 26 files changed, 296 insertions(+), 283 deletions(-) delete mode 100644 internal/wire/announce_ok_message.go rename internal/wire/{subscribe_done_message.go => publish_done_message.go} (73%) rename internal/wire/{subscribe_done_message_test.go => publish_done_message_test.go} (85%) rename internal/wire/{announce_cancel_message.go => publish_namespace_cancel_message.go} (62%) rename internal/wire/{announce_cancel_message_test.go => publish_namespace_cancel_message_test.go} (83%) create mode 100644 internal/wire/publish_namespace_done_message.go rename internal/wire/{unannounce_message_test.go => publish_namespace_done_message_test.go} (80%) rename internal/wire/{announce_error_message.go => publish_namespace_error_message.go} (63%) rename internal/wire/{announce_error_message_test.go => publish_namespace_error_message_test.go} (82%) rename internal/wire/{announce_message.go => publish_namespace_message.go} (66%) rename internal/wire/{announce_message_test.go => publish_namespace_message_test.go} (83%) create mode 100644 internal/wire/publish_namespace_ok_message.go rename internal/wire/{announce_ok_message_test.go => publish_namespace_ok_message_test.go} (77%) delete mode 100644 internal/wire/subscribe_announces_ok_message.go rename internal/wire/{subscribe_announces_error_message.go => subscribe_namespace_error_message.go} (70%) rename internal/wire/{subscribe_announces_message.go => subscribe_namespace_message.go} (73%) create mode 100644 internal/wire/subscribe_namespace_ok_message.go delete mode 100644 internal/wire/unannounce_message.go rename internal/wire/{unsubscribe_announces_message.go => unsubscribe_namespace_message.go} (50%) diff --git a/internal/wire/announce_ok_message.go b/internal/wire/announce_ok_message.go deleted file mode 100644 index 67683a21..00000000 --- a/internal/wire/announce_ok_message.go +++ /dev/null @@ -1,30 +0,0 @@ -package wire - -import ( - "log/slog" - - "github.com/quic-go/quic-go/quicvarint" -) - -type AnnounceOkMessage struct { - RequestID uint64 -} - -func (m *AnnounceOkMessage) LogValue() slog.Value { - return slog.GroupValue( - slog.String("type", "announce_ok"), - ) -} - -func (m AnnounceOkMessage) Type() controlMessageType { - return messageTypeAnnounceOk -} - -func (m *AnnounceOkMessage) Append(buf []byte) []byte { - return quicvarint.Append(buf, m.RequestID) -} - -func (m *AnnounceOkMessage) parse(_ Version, data []byte) (err error) { - m.RequestID, _, err = quicvarint.Parse(data) - return err -} diff --git a/internal/wire/control_message_parser.go b/internal/wire/control_message_parser.go index 15342da6..6baf6d66 100644 --- a/internal/wire/control_message_parser.go +++ b/internal/wire/control_message_parser.go @@ -67,8 +67,8 @@ func (p *ControlMessageParser) Parse() (ControlMessage, error) { m = &UnsubscribeMessage{} case messageTypeSubscribeUpdate: m = &SubscribeUpdateMessage{} - case messageTypeSubscribeDone: - m = &SubscribeDoneMessage{} + case messageTypePublishDone: + m = &PublishDoneMessage{} case messageTypeFetch: m = &FetchMessage{} @@ -84,25 +84,25 @@ func (p *ControlMessageParser) Parse() (ControlMessage, error) { case messageTypeTrackStatusOk: m = &TrackStatusMessage{} - case messageTypeAnnounce: - m = &AnnounceMessage{} - case messageTypeAnnounceOk: - m = &AnnounceOkMessage{} - case messageTypeAnnounceError: - m = &AnnounceErrorMessage{} - case messageTypeUnannounce: - m = &UnannounceMessage{} - case messageTypeAnnounceCancel: - m = &AnnounceCancelMessage{} + case messageTypePublishNamespace: + m = &PublishNamespaceMessage{} + case messageTypePublishNamespaceOk: + m = &PublishNamespaceOkMessage{} + case messageTypePublishNamespaceError: + m = &PublishNamespaceErrorMessage{} + case messageTypePublishNamespaceDone: + m = &PublishNamespaceDoneMessage{} + case messageTypePublishNamespaceCancel: + m = &PublishNamespaceCancelMessage{} case messageTypeSubscribeNamespace: - m = &SubscribeAnnouncesMessage{} + m = &SubscribeNamespaceMessage{} case messageTypeSubscribeNamespaceOk: - m = &SubscribeAnnouncesOkMessage{} + m = &SubscribeNamespaceOkMessage{} case messageTypeSubscribeNamespaceError: - m = &SubscribeAnnouncesErrorMessage{} + m = &SubscribeNamespaceErrorMessage{} case messageTypeUnsubscribeNamespace: - m = &UnsubscribeAnnouncesMessage{} + m = &UnsubscribeNamespaceMessage{} default: return nil, fmt.Errorf("%w: %v", errInvalidMessageType, mt) } diff --git a/internal/wire/control_message_type.go b/internal/wire/control_message_type.go index a6bc1931..a1cd585e 100644 --- a/internal/wire/control_message_type.go +++ b/internal/wire/control_message_type.go @@ -22,8 +22,8 @@ const ( messageTypeSubscribeError controlMessageType = 0x05 messageTypeSubscribeUpdate controlMessageType = 0x02 messageTypeUnsubscribe controlMessageType = 0x0a - messageTypeSubscribeDone controlMessageType = 0x0b + messageTypePublishDone controlMessageType = 0x0b messageTypePublish controlMessageType = 0x1d messageTypePublishOk controlMessageType = 0x1e messageTypePublishError controlMessageType = 0x1f @@ -37,11 +37,11 @@ const ( messageTypeTrackStatusOk controlMessageType = 0x0e messageTypeTrackStatusError controlMessageType = 0x0f - messageTypeAnnounce controlMessageType = 0x06 - messageTypeAnnounceOk controlMessageType = 0x07 - messageTypeAnnounceError controlMessageType = 0x08 - messageTypeUnannounce controlMessageType = 0x09 - messageTypeAnnounceCancel controlMessageType = 0x0c + messageTypePublishNamespace controlMessageType = 0x06 + messageTypePublishNamespaceOk controlMessageType = 0x07 + messageTypePublishNamespaceError controlMessageType = 0x08 + messageTypePublishNamespaceDone controlMessageType = 0x09 + messageTypePublishNamespaceCancel controlMessageType = 0x0c messageTypeSubscribeNamespace controlMessageType = 0x11 messageTypeSubscribeNamespaceOk controlMessageType = 0x12 @@ -74,9 +74,9 @@ func (mt controlMessageType) String() string { return "Unsubscribe" case messageTypeSubscribeUpdate: return "SubscribeUpdate" - case messageTypeSubscribeDone: - return "SubscribeDone" + case messageTypePublishDone: + return "PublishDone" case messageTypePublish: return "Publish" case messageTypePublishOk: @@ -100,16 +100,16 @@ func (mt controlMessageType) String() string { case messageTypeTrackStatusError: return "TrackStatusError" - case messageTypeAnnounce: - return "Announce" - case messageTypeAnnounceOk: - return "AnnounceOk" - case messageTypeAnnounceError: - return "AnnounceError" - case messageTypeUnannounce: - return "Unannounce" - case messageTypeAnnounceCancel: - return "AnnounceCancel" + case messageTypePublishNamespace: + return "PublishNamespace" + case messageTypePublishNamespaceOk: + return "PublishNamespaceOk" + case messageTypePublishNamespaceError: + return "PublishNamespaceError" + case messageTypePublishNamespaceDone: + return "PublishNamespaceDone" + case messageTypePublishNamespaceCancel: + return "PublishNamespaceCancel" case messageTypeSubscribeNamespace: return "SubscribeNamespace" diff --git a/internal/wire/subscribe_done_message.go b/internal/wire/publish_done_message.go similarity index 73% rename from internal/wire/subscribe_done_message.go rename to internal/wire/publish_done_message.go index 7ff58e55..f2f54b7c 100644 --- a/internal/wire/subscribe_done_message.go +++ b/internal/wire/publish_done_message.go @@ -6,16 +6,16 @@ import ( "github.com/quic-go/quic-go/quicvarint" ) -type SubscribeDoneMessage struct { +type PublishDoneMessage struct { RequestID uint64 StatusCode uint64 StreamCount uint64 ReasonPhrase string } -func (m *SubscribeDoneMessage) LogValue() slog.Value { +func (m *PublishDoneMessage) LogValue() slog.Value { return slog.GroupValue( - slog.String("type", "subscribe_done"), + slog.String("type", "publish_done"), slog.Uint64("request_id", m.RequestID), slog.Uint64("status_code", m.StatusCode), slog.Uint64("stream_count", m.StreamCount), @@ -23,11 +23,11 @@ func (m *SubscribeDoneMessage) LogValue() slog.Value { ) } -func (m SubscribeDoneMessage) Type() controlMessageType { - return messageTypeSubscribeDone +func (m PublishDoneMessage) Type() controlMessageType { + return messageTypePublishDone } -func (m *SubscribeDoneMessage) Append(buf []byte) []byte { +func (m *PublishDoneMessage) Append(buf []byte) []byte { buf = quicvarint.Append(buf, m.RequestID) buf = quicvarint.Append(buf, m.StatusCode) buf = quicvarint.Append(buf, m.StreamCount) @@ -35,7 +35,7 @@ func (m *SubscribeDoneMessage) Append(buf []byte) []byte { return buf } -func (m *SubscribeDoneMessage) parse(_ Version, data []byte) (err error) { +func (m *PublishDoneMessage) parse(_ Version, data []byte) (err error) { var n int m.RequestID, n, err = quicvarint.Parse(data) if err != nil { diff --git a/internal/wire/subscribe_done_message_test.go b/internal/wire/publish_done_message_test.go similarity index 85% rename from internal/wire/subscribe_done_message_test.go rename to internal/wire/publish_done_message_test.go index 03c8541f..826af950 100644 --- a/internal/wire/subscribe_done_message_test.go +++ b/internal/wire/publish_done_message_test.go @@ -10,12 +10,12 @@ import ( func TestSubscribeDoneMessageAppend(t *testing.T) { cases := []struct { - srm SubscribeDoneMessage + srm PublishDoneMessage buf []byte expect []byte }{ { - srm: SubscribeDoneMessage{ + srm: PublishDoneMessage{ RequestID: 0, StatusCode: 0, StreamCount: 0, @@ -25,7 +25,7 @@ func TestSubscribeDoneMessageAppend(t *testing.T) { expect: []byte{0x00, 0x00, 0x00, 0x00}, }, { - srm: SubscribeDoneMessage{ + srm: PublishDoneMessage{ RequestID: 0, StatusCode: 1, StreamCount: 2, @@ -40,7 +40,7 @@ func TestSubscribeDoneMessageAppend(t *testing.T) { }, }, { - srm: SubscribeDoneMessage{ + srm: PublishDoneMessage{ RequestID: 17, StatusCode: 1, StreamCount: 4, @@ -56,7 +56,7 @@ func TestSubscribeDoneMessageAppend(t *testing.T) { }, }, { - srm: SubscribeDoneMessage{ + srm: PublishDoneMessage{ RequestID: 0, StatusCode: 0, StreamCount: 0, @@ -66,7 +66,7 @@ func TestSubscribeDoneMessageAppend(t *testing.T) { expect: []byte{0x00, 0x00, 0x00, 0x00}, }, { - srm: SubscribeDoneMessage{ + srm: PublishDoneMessage{ RequestID: 0, StatusCode: 1, StreamCount: 2, @@ -81,7 +81,7 @@ func TestSubscribeDoneMessageAppend(t *testing.T) { }, }, { - srm: SubscribeDoneMessage{ + srm: PublishDoneMessage{ RequestID: 17, StatusCode: 1, StreamCount: 2, @@ -108,24 +108,24 @@ func TestSubscribeDoneMessageAppend(t *testing.T) { func TestParseSubscribeDoneMessage(t *testing.T) { cases := []struct { data []byte - expect *SubscribeDoneMessage + expect *PublishDoneMessage err error }{ { data: nil, - expect: &SubscribeDoneMessage{}, + expect: &PublishDoneMessage{}, err: io.EOF, }, { data: []byte{}, - expect: &SubscribeDoneMessage{}, + expect: &PublishDoneMessage{}, err: io.EOF, }, { data: []byte{ 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, }, - expect: &SubscribeDoneMessage{ + expect: &PublishDoneMessage{ RequestID: 0, StatusCode: 0, StreamCount: 0, @@ -143,7 +143,7 @@ func TestParseSubscribeDoneMessage(t *testing.T) { 0x02, 0x03, }, - expect: &SubscribeDoneMessage{ + expect: &PublishDoneMessage{ RequestID: 0, StatusCode: 1, StreamCount: 2, @@ -159,7 +159,7 @@ func TestParseSubscribeDoneMessage(t *testing.T) { 0x06, 'r', 'e', 'a', 's', 'o', 'n', 0x00, }, - expect: &SubscribeDoneMessage{ + expect: &PublishDoneMessage{ RequestID: 0, StatusCode: 1, StreamCount: 2, @@ -171,7 +171,7 @@ func TestParseSubscribeDoneMessage(t *testing.T) { data: []byte{ 0x00, 0x00, 0x00, 0x00, }, - expect: &SubscribeDoneMessage{ + expect: &PublishDoneMessage{ RequestID: 0, StatusCode: 0, StreamCount: 0, @@ -182,7 +182,7 @@ func TestParseSubscribeDoneMessage(t *testing.T) { } for i, tc := range cases { t.Run(fmt.Sprintf("%v", i), func(t *testing.T) { - res := &SubscribeDoneMessage{} + res := &PublishDoneMessage{} err := res.parse(CurrentVersion, tc.data) assert.Equal(t, tc.expect, res) if tc.err != nil { diff --git a/internal/wire/publish_error_message.go b/internal/wire/publish_error_message.go index ed576a57..078bdc1e 100644 --- a/internal/wire/publish_error_message.go +++ b/internal/wire/publish_error_message.go @@ -14,7 +14,7 @@ type PublishErrorMessage struct { func (m *PublishErrorMessage) LogValue() slog.Value { return slog.GroupValue( - slog.String("type", "subscribe_error"), + slog.String("type", "publish_error"), slog.Uint64("request_id", m.RequestID), slog.Uint64("error_code", m.ErrorCode), slog.String("reason", m.ReasonPhrase), diff --git a/internal/wire/announce_cancel_message.go b/internal/wire/publish_namespace_cancel_message.go similarity index 62% rename from internal/wire/announce_cancel_message.go rename to internal/wire/publish_namespace_cancel_message.go index 323a23e6..76ca829c 100644 --- a/internal/wire/announce_cancel_message.go +++ b/internal/wire/publish_namespace_cancel_message.go @@ -6,37 +6,37 @@ import ( "github.com/quic-go/quic-go/quicvarint" ) -type AnnounceCancelMessage struct { +type PublishNamespaceCancelMessage struct { TrackNamespace Tuple ErrorCode uint64 ReasonPhrase string } -func (m *AnnounceCancelMessage) LogValue() slog.Value { +func (m *PublishNamespaceCancelMessage) LogValue() slog.Value { return slog.GroupValue( - slog.String("type", "announce_cancel"), + slog.String("type", "publish_namespace_cancel"), slog.Any("track_namespace", m.TrackNamespace), slog.Uint64("error_code", m.ErrorCode), slog.String("reason", m.ReasonPhrase), ) } -func (m AnnounceCancelMessage) GetTrackNamespace() string { +func (m PublishNamespaceCancelMessage) GetTrackNamespace() string { return m.TrackNamespace.String() } -func (m AnnounceCancelMessage) Type() controlMessageType { - return messageTypeAnnounce +func (m PublishNamespaceCancelMessage) Type() controlMessageType { + return messageTypePublishNamespace } -func (m *AnnounceCancelMessage) Append(buf []byte) []byte { +func (m *PublishNamespaceCancelMessage) Append(buf []byte) []byte { buf = m.TrackNamespace.append(buf) buf = quicvarint.Append(buf, m.ErrorCode) buf = appendVarIntBytes(buf, []byte(m.ReasonPhrase)) return buf } -func (m *AnnounceCancelMessage) parse(_ Version, data []byte) (err error) { +func (m *PublishNamespaceCancelMessage) parse(_ Version, data []byte) (err error) { var n int m.TrackNamespace, n, err = parseTuple(data) if err != nil { diff --git a/internal/wire/announce_cancel_message_test.go b/internal/wire/publish_namespace_cancel_message_test.go similarity index 83% rename from internal/wire/announce_cancel_message_test.go rename to internal/wire/publish_namespace_cancel_message_test.go index a61e6467..a2961963 100644 --- a/internal/wire/announce_cancel_message_test.go +++ b/internal/wire/publish_namespace_cancel_message_test.go @@ -10,12 +10,12 @@ import ( func TestAnnounceCancelMessageAppend(t *testing.T) { cases := []struct { - aom AnnounceCancelMessage + aom PublishNamespaceCancelMessage buf []byte expect []byte }{ { - aom: AnnounceCancelMessage{ + aom: PublishNamespaceCancelMessage{ TrackNamespace: []string{""}, ErrorCode: 1, ReasonPhrase: "reason", @@ -26,7 +26,7 @@ func TestAnnounceCancelMessageAppend(t *testing.T) { }, }, { - aom: AnnounceCancelMessage{ + aom: PublishNamespaceCancelMessage{ TrackNamespace: []string{"tracknamespace"}, ErrorCode: 1, ReasonPhrase: "reason", @@ -51,19 +51,19 @@ func TestAnnounceCancelMessageAppend(t *testing.T) { func TestParseAnnounceCancelMessage(t *testing.T) { cases := []struct { data []byte - expect *AnnounceCancelMessage + expect *PublishNamespaceCancelMessage err error }{ { data: nil, - expect: &AnnounceCancelMessage{}, + expect: &PublishNamespaceCancelMessage{}, err: io.EOF, }, { data: append( []byte{0x01, 0x0E}, append([]byte("tracknamespace"), 0x00, 0x00)..., ), - expect: &AnnounceCancelMessage{ + expect: &PublishNamespaceCancelMessage{ TrackNamespace: []string{"tracknamespace"}, ErrorCode: 0, ReasonPhrase: "", @@ -72,7 +72,7 @@ func TestParseAnnounceCancelMessage(t *testing.T) { }, { data: append([]byte{0x01, 0x05}, append([]byte("track"), []byte{0x01, 0x06, 'r', 'e', 'a', 's', 'o', 'n', 'p', 'h', 'r', 'a', 's', 'e'}...)...), - expect: &AnnounceCancelMessage{ + expect: &PublishNamespaceCancelMessage{ TrackNamespace: []string{"track"}, ErrorCode: 1, ReasonPhrase: "reason", @@ -81,7 +81,7 @@ func TestParseAnnounceCancelMessage(t *testing.T) { }, { data: append([]byte{0x01, 0x0F}, "tracknamespace"...), - expect: &AnnounceCancelMessage{ + expect: &PublishNamespaceCancelMessage{ TrackNamespace: []string{}, }, err: errLengthMismatch, @@ -89,7 +89,7 @@ func TestParseAnnounceCancelMessage(t *testing.T) { } for i, tc := range cases { t.Run(fmt.Sprintf("%v", i), func(t *testing.T) { - res := &AnnounceCancelMessage{} + res := &PublishNamespaceCancelMessage{} err := res.parse(CurrentVersion, tc.data) assert.Equal(t, tc.expect, res) if tc.err != nil { diff --git a/internal/wire/publish_namespace_done_message.go b/internal/wire/publish_namespace_done_message.go new file mode 100644 index 00000000..f404cea5 --- /dev/null +++ b/internal/wire/publish_namespace_done_message.go @@ -0,0 +1,30 @@ +package wire + +import ( + "log/slog" +) + +type PublishNamespaceDoneMessage struct { + TrackNamespace Tuple +} + +func (m *PublishNamespaceDoneMessage) LogValue() slog.Value { + return slog.GroupValue( + slog.String("type", "publish_namespace_done"), + slog.Any("track_namespace", m.TrackNamespace), + ) +} + +func (m PublishNamespaceDoneMessage) Type() controlMessageType { + return messageTypePublishNamespaceDone +} + +func (m *PublishNamespaceDoneMessage) Append(buf []byte) []byte { + buf = m.TrackNamespace.append(buf) + return buf +} + +func (p *PublishNamespaceDoneMessage) parse(_ Version, data []byte) (err error) { + p.TrackNamespace, _, err = parseTuple(data) + return err +} diff --git a/internal/wire/unannounce_message_test.go b/internal/wire/publish_namespace_done_message_test.go similarity index 80% rename from internal/wire/unannounce_message_test.go rename to internal/wire/publish_namespace_done_message_test.go index 47d984d2..1526f1be 100644 --- a/internal/wire/unannounce_message_test.go +++ b/internal/wire/publish_namespace_done_message_test.go @@ -10,12 +10,12 @@ import ( func TestUnannounceMessageAppend(t *testing.T) { cases := []struct { - uam UnannounceMessage + uam PublishNamespaceDoneMessage buf []byte expect []byte }{ { - uam: UnannounceMessage{ + uam: PublishNamespaceDoneMessage{ TrackNamespace: []string{""}, }, buf: []byte{}, @@ -24,7 +24,7 @@ func TestUnannounceMessageAppend(t *testing.T) { }, }, { - uam: UnannounceMessage{ + uam: PublishNamespaceDoneMessage{ TrackNamespace: []string{"tracknamespace"}, }, buf: []byte{0x0a, 0x0b}, @@ -42,31 +42,31 @@ func TestUnannounceMessageAppend(t *testing.T) { func TestParseUnannounceMessage(t *testing.T) { cases := []struct { data []byte - expect *UnannounceMessage + expect *PublishNamespaceDoneMessage err error }{ { data: nil, - expect: &UnannounceMessage{}, + expect: &PublishNamespaceDoneMessage{}, err: io.EOF, }, { data: append([]byte{0x01, 0x0E}, "tracknamespace"...), - expect: &UnannounceMessage{ + expect: &PublishNamespaceDoneMessage{ TrackNamespace: []string{"tracknamespace"}, }, err: nil, }, { data: append([]byte{0x01, 0x05}, "tracknamespace"...), - expect: &UnannounceMessage{ + expect: &PublishNamespaceDoneMessage{ TrackNamespace: []string{"track"}, }, err: nil, }, { data: append([]byte{0x01, 0x0F}, "tracknamespace"...), - expect: &UnannounceMessage{ + expect: &PublishNamespaceDoneMessage{ TrackNamespace: []string{}, }, err: errLengthMismatch, @@ -74,7 +74,7 @@ func TestParseUnannounceMessage(t *testing.T) { } for i, tc := range cases { t.Run(fmt.Sprintf("%v", i), func(t *testing.T) { - res := &UnannounceMessage{} + res := &PublishNamespaceDoneMessage{} err := res.parse(CurrentVersion, tc.data) if tc.err != nil { assert.Equal(t, tc.err, err) diff --git a/internal/wire/announce_error_message.go b/internal/wire/publish_namespace_error_message.go similarity index 63% rename from internal/wire/announce_error_message.go rename to internal/wire/publish_namespace_error_message.go index bbf21955..71a3146f 100644 --- a/internal/wire/announce_error_message.go +++ b/internal/wire/publish_namespace_error_message.go @@ -6,32 +6,32 @@ import ( "github.com/quic-go/quic-go/quicvarint" ) -type AnnounceErrorMessage struct { +type PublishNamespaceErrorMessage struct { RequestID uint64 ErrorCode uint64 ReasonPhrase string } -func (m *AnnounceErrorMessage) LogValue() slog.Value { +func (m *PublishNamespaceErrorMessage) LogValue() slog.Value { return slog.GroupValue( - slog.String("type", "announce_error"), + slog.String("type", "publish_namespace_error"), slog.Uint64("error_code", m.ErrorCode), slog.String("reason", m.ReasonPhrase), ) } -func (m AnnounceErrorMessage) Type() controlMessageType { - return messageTypeAnnounceError +func (m PublishNamespaceErrorMessage) Type() controlMessageType { + return messageTypePublishNamespaceError } -func (m *AnnounceErrorMessage) Append(buf []byte) []byte { +func (m *PublishNamespaceErrorMessage) Append(buf []byte) []byte { buf = quicvarint.Append(buf, m.RequestID) buf = quicvarint.Append(buf, m.ErrorCode) buf = appendVarIntBytes(buf, []byte(m.ReasonPhrase)) return buf } -func (m *AnnounceErrorMessage) parse(_ Version, data []byte) (err error) { +func (m *PublishNamespaceErrorMessage) parse(_ Version, data []byte) (err error) { var n int m.RequestID, n, err = quicvarint.Parse(data) if err != nil { diff --git a/internal/wire/announce_error_message_test.go b/internal/wire/publish_namespace_error_message_test.go similarity index 82% rename from internal/wire/announce_error_message_test.go rename to internal/wire/publish_namespace_error_message_test.go index 685e7f8d..b628c97a 100644 --- a/internal/wire/announce_error_message_test.go +++ b/internal/wire/publish_namespace_error_message_test.go @@ -10,12 +10,12 @@ import ( func TestAnnounceErrorMessageAppend(t *testing.T) { cases := []struct { - aem AnnounceErrorMessage + aem PublishNamespaceErrorMessage buf []byte expect []byte }{ { - aem: AnnounceErrorMessage{ + aem: PublishNamespaceErrorMessage{ RequestID: 0, ErrorCode: 0, ReasonPhrase: "", @@ -26,7 +26,7 @@ func TestAnnounceErrorMessageAppend(t *testing.T) { }, }, { - aem: AnnounceErrorMessage{ + aem: PublishNamespaceErrorMessage{ RequestID: 1, ErrorCode: 1, ReasonPhrase: "reason", @@ -35,7 +35,7 @@ func TestAnnounceErrorMessageAppend(t *testing.T) { expect: append([]byte{0x01, 0x01, 0x06}, "reason"...), }, { - aem: AnnounceErrorMessage{ + aem: PublishNamespaceErrorMessage{ RequestID: 1, ErrorCode: 1, ReasonPhrase: "reason", @@ -55,17 +55,17 @@ func TestAnnounceErrorMessageAppend(t *testing.T) { func TestParseAnnounceErrorMessage(t *testing.T) { cases := []struct { data []byte - expect *AnnounceErrorMessage + expect *PublishNamespaceErrorMessage err error }{ { data: nil, - expect: &AnnounceErrorMessage{}, + expect: &PublishNamespaceErrorMessage{}, err: io.EOF, }, { data: []byte{0x01, 0x03, 0x03, 'e', 'r'}, - expect: &AnnounceErrorMessage{ + expect: &PublishNamespaceErrorMessage{ RequestID: 1, ErrorCode: 3, ReasonPhrase: "", @@ -74,7 +74,7 @@ func TestParseAnnounceErrorMessage(t *testing.T) { }, { data: append([]byte{0x00, 0x01, 0x0d}, "reason phrase"...), - expect: &AnnounceErrorMessage{ + expect: &PublishNamespaceErrorMessage{ RequestID: 0, ErrorCode: 1, ReasonPhrase: "reason phrase", @@ -84,7 +84,7 @@ func TestParseAnnounceErrorMessage(t *testing.T) { } for i, tc := range cases { t.Run(fmt.Sprintf("%v", i), func(t *testing.T) { - res := &AnnounceErrorMessage{} + res := &PublishNamespaceErrorMessage{} err := res.parse(CurrentVersion, tc.data) if tc.err != nil { assert.Equal(t, tc.err, err) diff --git a/internal/wire/announce_message.go b/internal/wire/publish_namespace_message.go similarity index 66% rename from internal/wire/announce_message.go rename to internal/wire/publish_namespace_message.go index 029edae8..92069c28 100644 --- a/internal/wire/announce_message.go +++ b/internal/wire/publish_namespace_message.go @@ -6,15 +6,15 @@ import ( "github.com/quic-go/quic-go/quicvarint" ) -type AnnounceMessage struct { +type PublishNamespaceMessage struct { RequestID uint64 TrackNamespace Tuple Parameters KVPList } -func (m *AnnounceMessage) LogValue() slog.Value { +func (m *PublishNamespaceMessage) LogValue() slog.Value { attrs := []slog.Attr{ - slog.String("type", "announce"), + slog.String("type", "publish_namespace"), slog.Any("track_namespace", m.TrackNamespace), slog.Uint64("number_of_parameters", uint64(len(m.Parameters))), } @@ -26,21 +26,21 @@ func (m *AnnounceMessage) LogValue() slog.Value { return slog.GroupValue(attrs...) } -func (m AnnounceMessage) GetTrackNamespace() string { +func (m PublishNamespaceMessage) GetTrackNamespace() string { return m.TrackNamespace.String() } -func (m AnnounceMessage) Type() controlMessageType { - return messageTypeAnnounce +func (m PublishNamespaceMessage) Type() controlMessageType { + return messageTypePublishNamespace } -func (m *AnnounceMessage) Append(buf []byte) []byte { +func (m *PublishNamespaceMessage) Append(buf []byte) []byte { buf = quicvarint.Append(buf, m.RequestID) buf = m.TrackNamespace.append(buf) return m.Parameters.appendNum(buf) } -func (m *AnnounceMessage) parse(_ Version, data []byte) (err error) { +func (m *PublishNamespaceMessage) parse(_ Version, data []byte) (err error) { var n int m.RequestID, n, err = quicvarint.Parse(data) if err != nil { diff --git a/internal/wire/announce_message_test.go b/internal/wire/publish_namespace_message_test.go similarity index 83% rename from internal/wire/announce_message_test.go rename to internal/wire/publish_namespace_message_test.go index adebd50e..d59e5404 100644 --- a/internal/wire/announce_message_test.go +++ b/internal/wire/publish_namespace_message_test.go @@ -10,12 +10,12 @@ import ( func TestAnnounceMessageAppend(t *testing.T) { cases := []struct { - am AnnounceMessage + am PublishNamespaceMessage buf []byte expect []byte }{ { - am: AnnounceMessage{ + am: PublishNamespaceMessage{ RequestID: 0, TrackNamespace: []string{""}, Parameters: KVPList{}, @@ -26,7 +26,7 @@ func TestAnnounceMessageAppend(t *testing.T) { }, }, { - am: AnnounceMessage{ + am: PublishNamespaceMessage{ RequestID: 1, TrackNamespace: []string{"tracknamespace"}, Parameters: KVPList{}, @@ -46,22 +46,22 @@ func TestAnnounceMessageAppend(t *testing.T) { func TestParseAnnounceMessage(t *testing.T) { cases := []struct { data []byte - expect *AnnounceMessage + expect *PublishNamespaceMessage err error }{ { data: nil, - expect: &AnnounceMessage{}, + expect: &PublishNamespaceMessage{}, err: io.EOF, }, { data: []byte{}, - expect: &AnnounceMessage{}, + expect: &PublishNamespaceMessage{}, err: io.EOF, }, { data: append(append([]byte{0x00, 0x01, 0x09}, "trackname"...), 0x00), - expect: &AnnounceMessage{ + expect: &PublishNamespaceMessage{ RequestID: 0, TrackNamespace: []string{"trackname"}, Parameters: KVPList{}, @@ -71,7 +71,7 @@ func TestParseAnnounceMessage(t *testing.T) { } for i, tc := range cases { t.Run(fmt.Sprintf("%v", i), func(t *testing.T) { - res := &AnnounceMessage{} + res := &PublishNamespaceMessage{} err := res.parse(CurrentVersion, tc.data) assert.Equal(t, tc.expect, res) if tc.err != nil { diff --git a/internal/wire/publish_namespace_ok_message.go b/internal/wire/publish_namespace_ok_message.go new file mode 100644 index 00000000..831189d0 --- /dev/null +++ b/internal/wire/publish_namespace_ok_message.go @@ -0,0 +1,30 @@ +package wire + +import ( + "log/slog" + + "github.com/quic-go/quic-go/quicvarint" +) + +type PublishNamespaceOkMessage struct { + RequestID uint64 +} + +func (m *PublishNamespaceOkMessage) LogValue() slog.Value { + return slog.GroupValue( + slog.String("type", "publish_namespace_ok"), + ) +} + +func (m PublishNamespaceOkMessage) Type() controlMessageType { + return messageTypePublishNamespaceOk +} + +func (m *PublishNamespaceOkMessage) Append(buf []byte) []byte { + return quicvarint.Append(buf, m.RequestID) +} + +func (m *PublishNamespaceOkMessage) parse(_ Version, data []byte) (err error) { + m.RequestID, _, err = quicvarint.Parse(data) + return err +} diff --git a/internal/wire/announce_ok_message_test.go b/internal/wire/publish_namespace_ok_message_test.go similarity index 77% rename from internal/wire/announce_ok_message_test.go rename to internal/wire/publish_namespace_ok_message_test.go index baadab6f..16c1b9c2 100644 --- a/internal/wire/announce_ok_message_test.go +++ b/internal/wire/publish_namespace_ok_message_test.go @@ -10,12 +10,12 @@ import ( func TestAnnounceOkMessageAppend(t *testing.T) { cases := []struct { - aom AnnounceOkMessage + aom PublishNamespaceOkMessage buf []byte expect []byte }{ { - aom: AnnounceOkMessage{ + aom: PublishNamespaceOkMessage{ RequestID: 1, }, buf: []byte{}, @@ -24,7 +24,7 @@ func TestAnnounceOkMessageAppend(t *testing.T) { }, }, { - aom: AnnounceOkMessage{ + aom: PublishNamespaceOkMessage{ RequestID: 1, }, buf: []byte{0x0a, 0x0b}, @@ -42,31 +42,31 @@ func TestAnnounceOkMessageAppend(t *testing.T) { func TestParseAnnounceOkMessage(t *testing.T) { cases := []struct { data []byte - expect *AnnounceOkMessage + expect *PublishNamespaceOkMessage err error }{ { data: nil, - expect: &AnnounceOkMessage{}, + expect: &PublishNamespaceOkMessage{}, err: io.EOF, }, { data: []byte{0x01}, - expect: &AnnounceOkMessage{ + expect: &PublishNamespaceOkMessage{ RequestID: 1, }, err: nil, }, { data: []byte{0x01}, - expect: &AnnounceOkMessage{ + expect: &PublishNamespaceOkMessage{ RequestID: 1, }, err: nil, }, { data: []byte{}, - expect: &AnnounceOkMessage{ + expect: &PublishNamespaceOkMessage{ RequestID: 0, }, err: io.EOF, @@ -74,7 +74,7 @@ func TestParseAnnounceOkMessage(t *testing.T) { } for i, tc := range cases { t.Run(fmt.Sprintf("%v", i), func(t *testing.T) { - res := &AnnounceOkMessage{} + res := &PublishNamespaceOkMessage{} err := res.parse(CurrentVersion, tc.data) assert.Equal(t, tc.expect, res) if tc.err != nil { diff --git a/internal/wire/subscribe_announces_ok_message.go b/internal/wire/subscribe_announces_ok_message.go deleted file mode 100644 index 0a43aba3..00000000 --- a/internal/wire/subscribe_announces_ok_message.go +++ /dev/null @@ -1,31 +0,0 @@ -package wire - -import ( - "log/slog" - - "github.com/quic-go/quic-go/quicvarint" -) - -// TODO: Add tests -type SubscribeAnnouncesOkMessage struct { - RequestID uint64 -} - -func (m *SubscribeAnnouncesOkMessage) LogValue() slog.Value { - return slog.GroupValue( - slog.String("type", "subscribe_announces_ok"), - ) -} - -func (m SubscribeAnnouncesOkMessage) Type() controlMessageType { - return messageTypeSubscribeNamespaceOk -} - -func (m *SubscribeAnnouncesOkMessage) Append(buf []byte) []byte { - return quicvarint.Append(buf, m.RequestID) -} - -func (m *SubscribeAnnouncesOkMessage) parse(_ Version, data []byte) (err error) { - m.RequestID, _, err = quicvarint.Parse(data) - return err -} diff --git a/internal/wire/subscribe_announces_error_message.go b/internal/wire/subscribe_namespace_error_message.go similarity index 70% rename from internal/wire/subscribe_announces_error_message.go rename to internal/wire/subscribe_namespace_error_message.go index 1be04c11..53fa1911 100644 --- a/internal/wire/subscribe_announces_error_message.go +++ b/internal/wire/subscribe_namespace_error_message.go @@ -7,31 +7,31 @@ import ( ) // TODO: Add tests -type SubscribeAnnouncesErrorMessage struct { +type SubscribeNamespaceErrorMessage struct { RequestID uint64 ErrorCode uint64 ReasonPhrase string } -func (m *SubscribeAnnouncesErrorMessage) LogValue() slog.Value { +func (m *SubscribeNamespaceErrorMessage) LogValue() slog.Value { return slog.GroupValue( - slog.String("type", "subscribe_announces_error"), + slog.String("type", "subscribe_namespace_error"), slog.Uint64("error_code", m.ErrorCode), slog.String("reason", m.ReasonPhrase), ) } -func (m SubscribeAnnouncesErrorMessage) Type() controlMessageType { +func (m SubscribeNamespaceErrorMessage) Type() controlMessageType { return messageTypeSubscribeNamespaceError } -func (m *SubscribeAnnouncesErrorMessage) Append(buf []byte) []byte { +func (m *SubscribeNamespaceErrorMessage) Append(buf []byte) []byte { buf = quicvarint.Append(buf, m.RequestID) buf = quicvarint.Append(buf, m.ErrorCode) return appendVarIntBytes(buf, []byte(m.ReasonPhrase)) } -func (m *SubscribeAnnouncesErrorMessage) parse(_ Version, data []byte) (err error) { +func (m *SubscribeNamespaceErrorMessage) parse(_ Version, data []byte) (err error) { var n int m.RequestID, n, err = quicvarint.Parse(data) if err != nil { diff --git a/internal/wire/subscribe_announces_message.go b/internal/wire/subscribe_namespace_message.go similarity index 73% rename from internal/wire/subscribe_announces_message.go rename to internal/wire/subscribe_namespace_message.go index 23883812..32be114f 100644 --- a/internal/wire/subscribe_announces_message.go +++ b/internal/wire/subscribe_namespace_message.go @@ -7,15 +7,15 @@ import ( ) // TODO: Add tests -type SubscribeAnnouncesMessage struct { +type SubscribeNamespaceMessage struct { RequestID uint64 TrackNamespacePrefix Tuple Parameters KVPList } -func (m *SubscribeAnnouncesMessage) LogValue() slog.Value { +func (m *SubscribeNamespaceMessage) LogValue() slog.Value { attrs := []slog.Attr{ - slog.String("type", "subscribe_announces"), + slog.String("type", "subscribe_namespace"), slog.Any("track_namespace_prefix", m.TrackNamespacePrefix), slog.Uint64("number_of_parameters", uint64(len(m.Parameters))), } @@ -27,17 +27,17 @@ func (m *SubscribeAnnouncesMessage) LogValue() slog.Value { return slog.GroupValue(attrs...) } -func (m SubscribeAnnouncesMessage) Type() controlMessageType { +func (m SubscribeNamespaceMessage) Type() controlMessageType { return messageTypeSubscribeNamespace } -func (m *SubscribeAnnouncesMessage) Append(buf []byte) []byte { +func (m *SubscribeNamespaceMessage) Append(buf []byte) []byte { buf = quicvarint.Append(buf, m.RequestID) buf = m.TrackNamespacePrefix.append(buf) return m.Parameters.appendNum(buf) } -func (m *SubscribeAnnouncesMessage) parse(_ Version, data []byte) (err error) { +func (m *SubscribeNamespaceMessage) parse(_ Version, data []byte) (err error) { var n int m.RequestID, n, err = quicvarint.Parse(data) if err != nil { diff --git a/internal/wire/subscribe_namespace_ok_message.go b/internal/wire/subscribe_namespace_ok_message.go new file mode 100644 index 00000000..568cf5c8 --- /dev/null +++ b/internal/wire/subscribe_namespace_ok_message.go @@ -0,0 +1,31 @@ +package wire + +import ( + "log/slog" + + "github.com/quic-go/quic-go/quicvarint" +) + +// TODO: Add tests +type SubscribeNamespaceOkMessage struct { + RequestID uint64 +} + +func (m *SubscribeNamespaceOkMessage) LogValue() slog.Value { + return slog.GroupValue( + slog.String("type", "subscribe_namespace_ok"), + ) +} + +func (m SubscribeNamespaceOkMessage) Type() controlMessageType { + return messageTypeSubscribeNamespaceOk +} + +func (m *SubscribeNamespaceOkMessage) Append(buf []byte) []byte { + return quicvarint.Append(buf, m.RequestID) +} + +func (m *SubscribeNamespaceOkMessage) parse(_ Version, data []byte) (err error) { + m.RequestID, _, err = quicvarint.Parse(data) + return err +} diff --git a/internal/wire/subscribe_update_message.go b/internal/wire/subscribe_update_message.go index 341085f2..edae6c4c 100644 --- a/internal/wire/subscribe_update_message.go +++ b/internal/wire/subscribe_update_message.go @@ -7,12 +7,13 @@ import ( ) type SubscribeUpdateMessage struct { - RequestID uint64 - StartLocation Location - EndGroup uint64 - SubscriberPriority uint8 - Forward uint8 - Parameters KVPList + RequestID uint64 + SubscriptionRequestID uint64 + StartLocation Location + EndGroup uint64 + SubscriberPriority uint8 + Forward uint8 + Parameters KVPList } func (m *SubscribeUpdateMessage) LogValue() slog.Value { @@ -39,6 +40,7 @@ func (m SubscribeUpdateMessage) Type() controlMessageType { func (m *SubscribeUpdateMessage) Append(buf []byte) []byte { buf = quicvarint.Append(buf, m.RequestID) + buf = quicvarint.Append(buf, m.SubscriptionRequestID) buf = m.StartLocation.append(buf) buf = quicvarint.Append(buf, m.EndGroup) buf = append(buf, m.SubscriberPriority) @@ -55,6 +57,12 @@ func (m *SubscribeUpdateMessage) parse(v Version, data []byte) (err error) { } data = data[n:] + m.SubscriptionRequestID, n, err = quicvarint.Parse(data) + if err != nil { + return err + } + data = data[n:] + n, err = m.StartLocation.parse(v, data) if err != nil { return err diff --git a/internal/wire/subscribe_update_message_test.go b/internal/wire/subscribe_update_message_test.go index 674ce856..ecd29264 100644 --- a/internal/wire/subscribe_update_message_test.go +++ b/internal/wire/subscribe_update_message_test.go @@ -16,7 +16,8 @@ func TestSubscribeUpdateMessageAppend(t *testing.T) { }{ { sum: SubscribeUpdateMessage{ - RequestID: 0, + RequestID: 0, + SubscriptionRequestID: 0, StartLocation: Location{ Group: 0, Object: 0, @@ -28,12 +29,13 @@ func TestSubscribeUpdateMessageAppend(t *testing.T) { }, buf: []byte{}, expect: []byte{ - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, }, }, { sum: SubscribeUpdateMessage{ - RequestID: 1, + RequestID: 1, + SubscriptionRequestID: 9, StartLocation: Location{ Group: 2, Object: 3, @@ -44,11 +46,12 @@ func TestSubscribeUpdateMessageAppend(t *testing.T) { Parameters: KVPList{KeyValuePair{Type: PathParameterKey, ValueBytes: []byte("A")}}, }, buf: []byte{}, - expect: []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x01, 0x01, 0x01, 0x01, 'A'}, + expect: []byte{0x01, 0x09, 0x02, 0x03, 0x04, 0x05, 0x01, 0x01, 0x01, 0x01, 'A'}, }, { sum: SubscribeUpdateMessage{ - RequestID: 1, + RequestID: 1, + SubscriptionRequestID: 9, StartLocation: Location{ Group: 2, Object: 3, @@ -59,7 +62,7 @@ func TestSubscribeUpdateMessageAppend(t *testing.T) { Parameters: KVPList{KeyValuePair{Type: PathParameterKey, ValueBytes: []byte("A")}}, }, buf: []byte{0x0a, 0x0b}, - expect: []byte{0x0a, 0x0b, 0x01, 0x02, 0x03, 0x04, 0x05, 0x01, 0x01, 0x01, 0x01, 'A'}, + expect: []byte{0x0a, 0x0b, 0x01, 0x09, 0x02, 0x03, 0x04, 0x05, 0x01, 0x01, 0x01, 0x01, 'A'}, }, } for i, tc := range cases { @@ -87,9 +90,10 @@ func TestParseSubscribeUpdateMessage(t *testing.T) { err: io.EOF, }, { - data: []byte{0x00, 0x01, 0x02}, + data: []byte{0x00, 0x09, 0x01, 0x02}, expect: &SubscribeUpdateMessage{ - RequestID: 0, + RequestID: 0, + SubscriptionRequestID: 9, StartLocation: Location{ Group: 1, Object: 2, @@ -102,9 +106,10 @@ func TestParseSubscribeUpdateMessage(t *testing.T) { err: io.EOF, }, { - data: []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x01, 0x01, 0x01, 0x01, 'P'}, + data: []byte{0x01, 0x09, 0x02, 0x03, 0x04, 0x05, 0x01, 0x01, 0x01, 0x01, 'P'}, expect: &SubscribeUpdateMessage{ - RequestID: 1, + RequestID: 1, + SubscriptionRequestID: 9, StartLocation: Location{ Group: 2, Object: 3, diff --git a/internal/wire/unannounce_message.go b/internal/wire/unannounce_message.go deleted file mode 100644 index 232b7ea8..00000000 --- a/internal/wire/unannounce_message.go +++ /dev/null @@ -1,30 +0,0 @@ -package wire - -import ( - "log/slog" -) - -type UnannounceMessage struct { - TrackNamespace Tuple -} - -func (m *UnannounceMessage) LogValue() slog.Value { - return slog.GroupValue( - slog.String("type", "unannounce"), - slog.Any("track_namespace", m.TrackNamespace), - ) -} - -func (m UnannounceMessage) Type() controlMessageType { - return messageTypeUnannounce -} - -func (m *UnannounceMessage) Append(buf []byte) []byte { - buf = m.TrackNamespace.append(buf) - return buf -} - -func (p *UnannounceMessage) parse(_ Version, data []byte) (err error) { - p.TrackNamespace, _, err = parseTuple(data) - return err -} diff --git a/internal/wire/unsubscribe_announces_message.go b/internal/wire/unsubscribe_namespace_message.go similarity index 50% rename from internal/wire/unsubscribe_announces_message.go rename to internal/wire/unsubscribe_namespace_message.go index 634224db..9dd0669b 100644 --- a/internal/wire/unsubscribe_announces_message.go +++ b/internal/wire/unsubscribe_namespace_message.go @@ -5,26 +5,26 @@ import ( ) // TODO: Add tests -type UnsubscribeAnnouncesMessage struct { +type UnsubscribeNamespaceMessage struct { TrackNamespacePrefix Tuple } -func (m *UnsubscribeAnnouncesMessage) LogValue() slog.Value { +func (m *UnsubscribeNamespaceMessage) LogValue() slog.Value { return slog.GroupValue( - slog.String("type", "unsubscribe_announces"), + slog.String("type", "unsubscribe_namespace"), slog.Any("track_namespace_prefix", m.TrackNamespacePrefix), ) } -func (m UnsubscribeAnnouncesMessage) Type() controlMessageType { +func (m UnsubscribeNamespaceMessage) Type() controlMessageType { return messageTypeUnsubscribeNamespace } -func (m *UnsubscribeAnnouncesMessage) Append(buf []byte) []byte { +func (m *UnsubscribeNamespaceMessage) Append(buf []byte) []byte { return m.TrackNamespacePrefix.append(buf) } -func (m *UnsubscribeAnnouncesMessage) parse(_ Version, data []byte) (err error) { +func (m *UnsubscribeNamespaceMessage) parse(_ Version, data []byte) (err error) { m.TrackNamespacePrefix, _, err = parseTuple(data) return err } diff --git a/session.go b/session.go index ec950ee8..ff516e16 100644 --- a/session.go +++ b/session.go @@ -649,7 +649,7 @@ func (s *Session) subscriptionDone(id, code, count uint64, reason string) error if !ok { return errUnknownRequestID } - return s.controlStream.write(&wire.SubscribeDoneMessage{ + return s.controlStream.write(&wire.PublishDoneMessage{ RequestID: lt.requestID, StatusCode: code, StreamCount: count, @@ -808,7 +808,7 @@ func (s *Session) Announce(ctx context.Context, namespace []string) error { response: make(chan error, 1), } s.outgoingAnnouncements.add(a) - am := &wire.AnnounceMessage{ + am := &wire.PublishNamespaceMessage{ RequestID: a.requestID, TrackNamespace: a.namespace, Parameters: a.parameters, @@ -829,7 +829,7 @@ func (s *Session) acceptAnnouncement(requestID uint64) error { if _, err := s.incomingAnnouncements.confirmAndGet(requestID); err != nil { return err } - if err := s.controlStream.write(&wire.AnnounceOkMessage{ + if err := s.controlStream.write(&wire.PublishNamespaceOkMessage{ RequestID: requestID, }); err != nil { return err @@ -838,7 +838,7 @@ func (s *Session) acceptAnnouncement(requestID uint64) error { } func (s *Session) rejectAnnouncement(requestID uint64, c uint64, r string) error { - return s.controlStream.write(&wire.AnnounceErrorMessage{ + return s.controlStream.write(&wire.PublishNamespaceErrorMessage{ RequestID: requestID, ErrorCode: c, ReasonPhrase: r, @@ -849,7 +849,7 @@ func (s *Session) Unannounce(ctx context.Context, namespace []string) error { if ok := s.outgoingAnnouncements.delete(namespace); ok { return errUnknownAnnouncementNamespace } - u := &wire.UnannounceMessage{ + u := &wire.PublishNamespaceDoneMessage{ TrackNamespace: namespace, } return s.controlStream.write(u) @@ -859,7 +859,7 @@ func (s *Session) AnnounceCancel(ctx context.Context, namespace []string, errorC if !s.incomingAnnouncements.delete(namespace) { return errUnknownAnnouncementNamespace } - acm := &wire.AnnounceCancelMessage{ + acm := &wire.PublishNamespaceCancelMessage{ TrackNamespace: namespace, ErrorCode: errorCode, ReasonPhrase: reason, @@ -880,7 +880,7 @@ func (s *Session) SubscribeAnnouncements(ctx context.Context, prefix []string) e response: make(chan announcementSubscriptionResponse, 1), } s.pendingOutgointAnnouncementSubscriptions.add(as) - sam := &wire.SubscribeAnnouncesMessage{ + sam := &wire.SubscribeNamespaceMessage{ RequestID: as.requestID, TrackNamespacePrefix: as.namespace, Parameters: wire.KVPList{}, @@ -898,13 +898,13 @@ func (s *Session) SubscribeAnnouncements(ctx context.Context, prefix []string) e } func (s *Session) acceptAnnouncementSubscription(requestID uint64) error { - return s.controlStream.write(&wire.SubscribeAnnouncesOkMessage{ + return s.controlStream.write(&wire.SubscribeNamespaceOkMessage{ RequestID: requestID, }) } func (s *Session) rejectAnnouncementSubscription(requestID uint64, c uint64, r string) error { - return s.controlStream.write(&wire.SubscribeAnnouncesErrorMessage{ + return s.controlStream.write(&wire.SubscribeNamespaceErrorMessage{ RequestID: requestID, ErrorCode: c, ReasonPhrase: r, @@ -913,7 +913,7 @@ func (s *Session) rejectAnnouncementSubscription(requestID uint64, c uint64, r s func (s *Session) UnsubscribeAnnouncements(ctx context.Context, namespace []string) error { s.pendingOutgointAnnouncementSubscriptions.delete(namespace) - uam := &wire.UnsubscribeAnnouncesMessage{ + uam := &wire.UnsubscribeNamespaceMessage{ TrackNamespacePrefix: namespace, } return s.controlStream.write(uam) @@ -952,7 +952,7 @@ func (s *Session) receive(msg wire.ControlMessage) error { err = s.onSubscribeUpdate(m) case *wire.UnsubscribeMessage: err = s.onUnsubscribe(m) - case *wire.SubscribeDoneMessage: + case *wire.PublishDoneMessage: err = s.onSubscribeDone(m) case *wire.FetchMessage: err = s.onFetch(m) @@ -966,23 +966,23 @@ func (s *Session) receive(msg wire.ControlMessage) error { err = s.onTrackStatusRequest(m) case *wire.TrackStatusMessage: err = s.onTrackStatus(m) - case *wire.AnnounceMessage: + case *wire.PublishNamespaceMessage: err = s.onAnnounce(m) - case *wire.AnnounceOkMessage: + case *wire.PublishNamespaceOkMessage: err = s.onAnnounceOk(m) - case *wire.AnnounceErrorMessage: + case *wire.PublishNamespaceErrorMessage: err = s.onAnnounceError(m) - case *wire.UnannounceMessage: + case *wire.PublishNamespaceDoneMessage: err = s.onUnannounce(m) - case *wire.AnnounceCancelMessage: + case *wire.PublishNamespaceCancelMessage: err = s.onAnnounceCancel(m) - case *wire.SubscribeAnnouncesMessage: + case *wire.SubscribeNamespaceMessage: err = s.onSubscribeAnnounces(m) - case *wire.SubscribeAnnouncesOkMessage: + case *wire.SubscribeNamespaceOkMessage: err = s.onSubscribeAnnouncesOk(m) - case *wire.SubscribeAnnouncesErrorMessage: + case *wire.SubscribeNamespaceErrorMessage: err = s.onSubscribeAnnouncesError(m) - case *wire.UnsubscribeAnnouncesMessage: + case *wire.UnsubscribeNamespaceMessage: s.onUnsubscribeAnnounces(m) default: err = errUnexpectedMessageType @@ -1218,7 +1218,7 @@ func (s *Session) onUnsubscribe(msg *wire.UnsubscribeMessage) error { return nil } -func (s *Session) onSubscribeDone(msg *wire.SubscribeDoneMessage) error { +func (s *Session) onSubscribeDone(msg *wire.PublishDoneMessage) error { sub, ok := s.remoteTracks.findByRequestID(msg.RequestID) if !ok { return errUnknownRequestID @@ -1349,7 +1349,7 @@ func (s *Session) onTrackStatus(msg *wire.TrackStatusMessage) error { return nil } -func (s *Session) onAnnounce(msg *wire.AnnounceMessage) error { +func (s *Session) onAnnounce(msg *wire.PublishNamespaceMessage) error { if len(msg.TrackNamespace) == 0 || len(msg.TrackNamespace) > 32 { return errInvalidNamespaceLength } @@ -1377,7 +1377,7 @@ func (s *Session) onAnnounce(msg *wire.AnnounceMessage) error { return nil } -func (s *Session) onAnnounceOk(msg *wire.AnnounceOkMessage) error { +func (s *Session) onAnnounceOk(msg *wire.PublishNamespaceOkMessage) error { announcement, err := s.outgoingAnnouncements.confirmAndGet(msg.RequestID) if err != nil { return errUnknownAnnouncement @@ -1390,7 +1390,7 @@ func (s *Session) onAnnounceOk(msg *wire.AnnounceOkMessage) error { return nil } -func (s *Session) onAnnounceError(msg *wire.AnnounceErrorMessage) error { +func (s *Session) onAnnounceError(msg *wire.PublishNamespaceErrorMessage) error { announcement, ok := s.outgoingAnnouncements.reject(msg.RequestID) if !ok { return errUnknownAnnouncement @@ -1406,7 +1406,7 @@ func (s *Session) onAnnounceError(msg *wire.AnnounceErrorMessage) error { return nil } -func (s *Session) onUnannounce(msg *wire.UnannounceMessage) error { +func (s *Session) onUnannounce(msg *wire.PublishNamespaceDoneMessage) error { if len(msg.TrackNamespace) == 0 || len(msg.TrackNamespace) > 32 { return errInvalidNamespaceLength } @@ -1420,7 +1420,7 @@ func (s *Session) onUnannounce(msg *wire.UnannounceMessage) error { return nil } -func (s *Session) onAnnounceCancel(msg *wire.AnnounceCancelMessage) error { +func (s *Session) onAnnounceCancel(msg *wire.PublishNamespaceCancelMessage) error { if len(msg.TrackNamespace) == 0 || len(msg.TrackNamespace) > 32 { return errInvalidNamespaceLength } @@ -1433,7 +1433,7 @@ func (s *Session) onAnnounceCancel(msg *wire.AnnounceCancelMessage) error { return nil } -func (s *Session) onSubscribeAnnounces(msg *wire.SubscribeAnnouncesMessage) error { +func (s *Session) onSubscribeAnnounces(msg *wire.SubscribeNamespaceMessage) error { s.pendingIncomingAnnouncementSubscriptions.add(&announcementSubscription{ requestID: msg.RequestID, namespace: msg.TrackNamespacePrefix, @@ -1455,7 +1455,7 @@ func (s *Session) onSubscribeAnnounces(msg *wire.SubscribeAnnouncesMessage) erro return nil } -func (s *Session) onSubscribeAnnouncesOk(msg *wire.SubscribeAnnouncesOkMessage) error { +func (s *Session) onSubscribeAnnouncesOk(msg *wire.SubscribeNamespaceOkMessage) error { as, ok := s.pendingOutgointAnnouncementSubscriptions.deleteByID(msg.RequestID) if !ok { return errUnknownSubscribeAnnouncesPrefix @@ -1470,7 +1470,7 @@ func (s *Session) onSubscribeAnnouncesOk(msg *wire.SubscribeAnnouncesOkMessage) return nil } -func (s *Session) onSubscribeAnnouncesError(msg *wire.SubscribeAnnouncesErrorMessage) error { +func (s *Session) onSubscribeAnnouncesError(msg *wire.SubscribeNamespaceErrorMessage) error { as, ok := s.pendingOutgointAnnouncementSubscriptions.deleteByID(msg.RequestID) if !ok { return errUnknownSubscribeAnnouncesPrefix @@ -1488,7 +1488,7 @@ func (s *Session) onSubscribeAnnouncesError(msg *wire.SubscribeAnnouncesErrorMes return nil } -func (s *Session) onUnsubscribeAnnounces(msg *wire.UnsubscribeAnnouncesMessage) { +func (s *Session) onUnsubscribeAnnounces(msg *wire.UnsubscribeNamespaceMessage) { s.Handler.Handle(nil, &Message{ Method: MessageUnsubscribeAnnounces, Namespace: msg.TrackNamespacePrefix, diff --git a/session_test.go b/session_test.go index 07196d4e..6e7663f1 100644 --- a/session_test.go +++ b/session_test.go @@ -371,12 +371,12 @@ func TestSession(t *testing.T) { s := newSession(conn, cs, nil) s.handshakeDone.Store(true) - cs.EXPECT().write(&wire.AnnounceMessage{ + cs.EXPECT().write(&wire.PublishNamespaceMessage{ RequestID: 0, TrackNamespace: []string{"namespace"}, Parameters: wire.KVPList{}, }).DoAndReturn(func(_ wire.ControlMessage) error { - err := s.receive(&wire.AnnounceOkMessage{ + err := s.receive(&wire.PublishNamespaceOkMessage{ RequestID: 0, }) assert.NoError(t, err) @@ -404,10 +404,10 @@ func TestSession(t *testing.T) { }).DoAndReturn(func(rw ResponseWriter, req *Message) { assert.NoError(t, rw.Accept()) }) - cs.EXPECT().write(&wire.AnnounceOkMessage{ + cs.EXPECT().write(&wire.PublishNamespaceOkMessage{ RequestID: 2, }) - err := s.receive(&wire.AnnounceMessage{ + err := s.receive(&wire.PublishNamespaceMessage{ RequestID: 2, TrackNamespace: []string{"namespace"}, Parameters: wire.KVPList{},