Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 0 additions & 30 deletions internal/wire/announce_ok_message.go

This file was deleted.

32 changes: 16 additions & 16 deletions internal/wire/control_message_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ func (p *ControlMessageParser) Parse() (ControlMessage, error) {
m = &UnsubscribeMessage{}
case messageTypeSubscribeUpdate:
m = &SubscribeUpdateMessage{}
case messageTypeSubscribeDone:
m = &SubscribeDoneMessage{}
case messageTypePublishDone:
m = &PublishDoneMessage{}

case messageTypeFetch:
m = &FetchMessage{}
Expand All @@ -84,25 +84,25 @@ func (p *ControlMessageParser) Parse() (ControlMessage, error) {
case messageTypeTrackStatusOk:
m = &TrackStatusMessage{}

case messageTypeAnnounce:
m = &AnnounceMessage{}
case messageTypeAnnounceOk:
m = &AnnounceOkMessage{}
case messageTypeAnnounceError:
m = &AnnounceErrorMessage{}
case messageTypeUnannounce:
m = &UnannounceMessage{}
case messageTypeAnnounceCancel:
m = &AnnounceCancelMessage{}
case messageTypePublishNamespace:
m = &PublishNamespaceMessage{}
case messageTypePublishNamespaceOk:
m = &PublishNamespaceOkMessage{}
case messageTypePublishNamespaceError:
m = &PublishNamespaceErrorMessage{}
case messageTypePublishNamespaceDone:
m = &PublishNamespaceDoneMessage{}
case messageTypePublishNamespaceCancel:
m = &PublishNamespaceCancelMessage{}

case messageTypeSubscribeNamespace:
m = &SubscribeAnnouncesMessage{}
m = &SubscribeNamespaceMessage{}
case messageTypeSubscribeNamespaceOk:
m = &SubscribeAnnouncesOkMessage{}
m = &SubscribeNamespaceOkMessage{}
case messageTypeSubscribeNamespaceError:
m = &SubscribeAnnouncesErrorMessage{}
m = &SubscribeNamespaceErrorMessage{}
case messageTypeUnsubscribeNamespace:
m = &UnsubscribeAnnouncesMessage{}
m = &UnsubscribeNamespaceMessage{}
default:
return nil, fmt.Errorf("%w: %v", errInvalidMessageType, mt)
}
Expand Down
36 changes: 18 additions & 18 deletions internal/wire/control_message_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ const (
messageTypeSubscribeError controlMessageType = 0x05
messageTypeSubscribeUpdate controlMessageType = 0x02
messageTypeUnsubscribe controlMessageType = 0x0a
messageTypeSubscribeDone controlMessageType = 0x0b

messageTypePublishDone controlMessageType = 0x0b
messageTypePublish controlMessageType = 0x1d
messageTypePublishOk controlMessageType = 0x1e
messageTypePublishError controlMessageType = 0x1f
Expand All @@ -37,11 +37,11 @@ const (
messageTypeTrackStatusOk controlMessageType = 0x0e
messageTypeTrackStatusError controlMessageType = 0x0f

messageTypeAnnounce controlMessageType = 0x06
messageTypeAnnounceOk controlMessageType = 0x07
messageTypeAnnounceError controlMessageType = 0x08
messageTypeUnannounce controlMessageType = 0x09
messageTypeAnnounceCancel controlMessageType = 0x0c
messageTypePublishNamespace controlMessageType = 0x06
messageTypePublishNamespaceOk controlMessageType = 0x07
messageTypePublishNamespaceError controlMessageType = 0x08
messageTypePublishNamespaceDone controlMessageType = 0x09
messageTypePublishNamespaceCancel controlMessageType = 0x0c

messageTypeSubscribeNamespace controlMessageType = 0x11
messageTypeSubscribeNamespaceOk controlMessageType = 0x12
Expand Down Expand Up @@ -74,9 +74,9 @@ func (mt controlMessageType) String() string {
return "Unsubscribe"
case messageTypeSubscribeUpdate:
return "SubscribeUpdate"
case messageTypeSubscribeDone:
return "SubscribeDone"

case messageTypePublishDone:
return "PublishDone"
case messageTypePublish:
return "Publish"
case messageTypePublishOk:
Expand All @@ -100,16 +100,16 @@ func (mt controlMessageType) String() string {
case messageTypeTrackStatusError:
return "TrackStatusError"

case messageTypeAnnounce:
return "Announce"
case messageTypeAnnounceOk:
return "AnnounceOk"
case messageTypeAnnounceError:
return "AnnounceError"
case messageTypeUnannounce:
return "Unannounce"
case messageTypeAnnounceCancel:
return "AnnounceCancel"
case messageTypePublishNamespace:
return "PublishNamespace"
case messageTypePublishNamespaceOk:
return "PublishNamespaceOk"
case messageTypePublishNamespaceError:
return "PublishNamespaceError"
case messageTypePublishNamespaceDone:
return "PublishNamespaceDone"
case messageTypePublishNamespaceCancel:
return "PublishNamespaceCancel"

case messageTypeSubscribeNamespace:
return "SubscribeNamespace"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,36 +6,36 @@ import (
"github.com/quic-go/quic-go/quicvarint"
)

type SubscribeDoneMessage struct {
type PublishDoneMessage struct {
RequestID uint64
StatusCode uint64
StreamCount uint64
ReasonPhrase string
}

func (m *SubscribeDoneMessage) LogValue() slog.Value {
func (m *PublishDoneMessage) LogValue() slog.Value {
return slog.GroupValue(
slog.String("type", "subscribe_done"),
slog.String("type", "publish_done"),
slog.Uint64("request_id", m.RequestID),
slog.Uint64("status_code", m.StatusCode),
slog.Uint64("stream_count", m.StreamCount),
slog.String("reason", m.ReasonPhrase),
)
}

func (m SubscribeDoneMessage) Type() controlMessageType {
return messageTypeSubscribeDone
func (m PublishDoneMessage) Type() controlMessageType {
return messageTypePublishDone
}

func (m *SubscribeDoneMessage) Append(buf []byte) []byte {
func (m *PublishDoneMessage) Append(buf []byte) []byte {
buf = quicvarint.Append(buf, m.RequestID)
buf = quicvarint.Append(buf, m.StatusCode)
buf = quicvarint.Append(buf, m.StreamCount)
buf = appendVarIntBytes(buf, []byte(m.ReasonPhrase))
return buf
}

func (m *SubscribeDoneMessage) parse(_ Version, data []byte) (err error) {
func (m *PublishDoneMessage) parse(_ Version, data []byte) (err error) {
var n int
m.RequestID, n, err = quicvarint.Parse(data)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ import (

func TestSubscribeDoneMessageAppend(t *testing.T) {
cases := []struct {
srm SubscribeDoneMessage
srm PublishDoneMessage
buf []byte
expect []byte
}{
{
srm: SubscribeDoneMessage{
srm: PublishDoneMessage{
RequestID: 0,
StatusCode: 0,
StreamCount: 0,
Expand All @@ -25,7 +25,7 @@ func TestSubscribeDoneMessageAppend(t *testing.T) {
expect: []byte{0x00, 0x00, 0x00, 0x00},
},
{
srm: SubscribeDoneMessage{
srm: PublishDoneMessage{
RequestID: 0,
StatusCode: 1,
StreamCount: 2,
Expand All @@ -40,7 +40,7 @@ func TestSubscribeDoneMessageAppend(t *testing.T) {
},
},
{
srm: SubscribeDoneMessage{
srm: PublishDoneMessage{
RequestID: 17,
StatusCode: 1,
StreamCount: 4,
Expand All @@ -56,7 +56,7 @@ func TestSubscribeDoneMessageAppend(t *testing.T) {
},
},
{
srm: SubscribeDoneMessage{
srm: PublishDoneMessage{
RequestID: 0,
StatusCode: 0,
StreamCount: 0,
Expand All @@ -66,7 +66,7 @@ func TestSubscribeDoneMessageAppend(t *testing.T) {
expect: []byte{0x00, 0x00, 0x00, 0x00},
},
{
srm: SubscribeDoneMessage{
srm: PublishDoneMessage{
RequestID: 0,
StatusCode: 1,
StreamCount: 2,
Expand All @@ -81,7 +81,7 @@ func TestSubscribeDoneMessageAppend(t *testing.T) {
},
},
{
srm: SubscribeDoneMessage{
srm: PublishDoneMessage{
RequestID: 17,
StatusCode: 1,
StreamCount: 2,
Expand All @@ -108,24 +108,24 @@ func TestSubscribeDoneMessageAppend(t *testing.T) {
func TestParseSubscribeDoneMessage(t *testing.T) {
cases := []struct {
data []byte
expect *SubscribeDoneMessage
expect *PublishDoneMessage
err error
}{
{
data: nil,
expect: &SubscribeDoneMessage{},
expect: &PublishDoneMessage{},
err: io.EOF,
},
{
data: []byte{},
expect: &SubscribeDoneMessage{},
expect: &PublishDoneMessage{},
err: io.EOF,
},
{
data: []byte{
0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00,
},
expect: &SubscribeDoneMessage{
expect: &PublishDoneMessage{
RequestID: 0,
StatusCode: 0,
StreamCount: 0,
Expand All @@ -143,7 +143,7 @@ func TestParseSubscribeDoneMessage(t *testing.T) {
0x02,
0x03,
},
expect: &SubscribeDoneMessage{
expect: &PublishDoneMessage{
RequestID: 0,
StatusCode: 1,
StreamCount: 2,
Expand All @@ -159,7 +159,7 @@ func TestParseSubscribeDoneMessage(t *testing.T) {
0x06, 'r', 'e', 'a', 's', 'o', 'n',
0x00,
},
expect: &SubscribeDoneMessage{
expect: &PublishDoneMessage{
RequestID: 0,
StatusCode: 1,
StreamCount: 2,
Expand All @@ -171,7 +171,7 @@ func TestParseSubscribeDoneMessage(t *testing.T) {
data: []byte{
0x00, 0x00, 0x00, 0x00,
},
expect: &SubscribeDoneMessage{
expect: &PublishDoneMessage{
RequestID: 0,
StatusCode: 0,
StreamCount: 0,
Expand All @@ -182,7 +182,7 @@ func TestParseSubscribeDoneMessage(t *testing.T) {
}
for i, tc := range cases {
t.Run(fmt.Sprintf("%v", i), func(t *testing.T) {
res := &SubscribeDoneMessage{}
res := &PublishDoneMessage{}
err := res.parse(CurrentVersion, tc.data)
assert.Equal(t, tc.expect, res)
if tc.err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/wire/publish_error_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type PublishErrorMessage struct {

func (m *PublishErrorMessage) LogValue() slog.Value {
return slog.GroupValue(
slog.String("type", "subscribe_error"),
slog.String("type", "publish_error"),
slog.Uint64("request_id", m.RequestID),
slog.Uint64("error_code", m.ErrorCode),
slog.String("reason", m.ReasonPhrase),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,37 +6,37 @@ import (
"github.com/quic-go/quic-go/quicvarint"
)

type AnnounceCancelMessage struct {
type PublishNamespaceCancelMessage struct {
TrackNamespace Tuple
ErrorCode uint64
ReasonPhrase string
}

func (m *AnnounceCancelMessage) LogValue() slog.Value {
func (m *PublishNamespaceCancelMessage) LogValue() slog.Value {
return slog.GroupValue(
slog.String("type", "announce_cancel"),
slog.String("type", "publish_namespace_cancel"),
slog.Any("track_namespace", m.TrackNamespace),
slog.Uint64("error_code", m.ErrorCode),
slog.String("reason", m.ReasonPhrase),
)
}

func (m AnnounceCancelMessage) GetTrackNamespace() string {
func (m PublishNamespaceCancelMessage) GetTrackNamespace() string {
return m.TrackNamespace.String()
}

func (m AnnounceCancelMessage) Type() controlMessageType {
return messageTypeAnnounce
func (m PublishNamespaceCancelMessage) Type() controlMessageType {
return messageTypePublishNamespace
}

func (m *AnnounceCancelMessage) Append(buf []byte) []byte {
func (m *PublishNamespaceCancelMessage) Append(buf []byte) []byte {
buf = m.TrackNamespace.append(buf)
buf = quicvarint.Append(buf, m.ErrorCode)
buf = appendVarIntBytes(buf, []byte(m.ReasonPhrase))
return buf
}

func (m *AnnounceCancelMessage) parse(_ Version, data []byte) (err error) {
func (m *PublishNamespaceCancelMessage) parse(_ Version, data []byte) (err error) {
var n int
m.TrackNamespace, n, err = parseTuple(data)
if err != nil {
Expand Down
Loading