Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 0 additions & 13 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ const (
MessageAnnounce = "ANNOUNCE"
MessageAnnounceCancel = "ANNOUNCE_CANCEL"
MessageUnannounce = "UNANNOUNCE"
MessageTrackStatusRequest = "TRACK_STATUS_REQUEST"
MessageTrackStatus = "TRACK_STATUS"
MessageGoAway = "GO_AWAY"
MessageSubscribeAnnounces = "SUBSCRIBE_ANNOUNCES"
MessageUnsubscribeAnnounces = "UNSUBSCRIBE_ANNOUNCES"
Expand Down Expand Up @@ -70,17 +68,6 @@ type FetchPublisher interface {
FetchStream() (*FetchStream, error)
}

// StatusRequestHandler is the interface implemented by ResponseWriters of
// TrackStatusRequest messages. The first call to Accept sends the response.
// Calling Reject sets the status to "track does not exist" and then calls
// Accept. Reject ignores the errorCode and reasonPhrase. Applications are
// responsible for following the ruls of track status messages.
type StatusRequestHandler interface {
// SetStatus sets the status for the response. Call this before calling
// Accept.
SetStatus(statusCode, lastGroupID, lastObjectID uint64)
}

// Handler is the handler interface for non-specific MoQ messages.
type Handler interface {
Handle(ResponseWriter, *Message)
Expand Down
30 changes: 0 additions & 30 deletions internal/wire/announce_ok_message.go

This file was deleted.

47 changes: 28 additions & 19 deletions internal/wire/control_message_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (p *ControlMessageParser) Parse() (ControlMessage, error) {
m = &RequestsBlockedMessage{}

case messageTypeSubscribe:
m = &SubscribeMessage{}
m = &SubscribeMessage{TrackStatus: false}
case messageTypeSubscribeOk:
m = &SubscribeOkMessage{}
case messageTypeSubscribeError:
Expand All @@ -67,8 +67,15 @@ func (p *ControlMessageParser) Parse() (ControlMessage, error) {
m = &UnsubscribeMessage{}
case messageTypeSubscribeUpdate:
m = &SubscribeUpdateMessage{}
case messageTypeSubscribeDone:
m = &SubscribeDoneMessage{}

case messageTypePublishDone:
m = &PublishDoneMessage{}
case messageTypePublish:
m = &PublishMessage{}
case messageTypePublishOk:
m = &PublishOkMessage{}
case messageTypePublishError:
m = &PublishErrorMessage{}

case messageTypeFetch:
m = &FetchMessage{}
Expand All @@ -80,29 +87,31 @@ func (p *ControlMessageParser) Parse() (ControlMessage, error) {
m = &FetchCancelMessage{}

case messageTypeTrackStatus:
m = &TrackStatusRequestMessage{}
m = &SubscribeMessage{TrackStatus: true}
case messageTypeTrackStatusOk:
m = &TrackStatusMessage{}
m = &SubscribeOkMessage{}
case messageTypeTrackStatusError:
m = &SubscribeErrorMessage{}

case messageTypeAnnounce:
m = &AnnounceMessage{}
case messageTypeAnnounceOk:
m = &AnnounceOkMessage{}
case messageTypeAnnounceError:
m = &AnnounceErrorMessage{}
case messageTypeUnannounce:
m = &UnannounceMessage{}
case messageTypeAnnounceCancel:
m = &AnnounceCancelMessage{}
case messageTypePublishNamespace:
m = &PublishNamespaceMessage{}
case messageTypePublishNamespaceOk:
m = &PublishNamespaceOkMessage{}
case messageTypePublishNamespaceError:
m = &PublishNamespaceErrorMessage{}
case messageTypePublishNamespaceDone:
m = &PublishNamespaceDoneMessage{}
case messageTypePublishNamespaceCancel:
m = &PublishNamespaceCancelMessage{}

case messageTypeSubscribeNamespace:
m = &SubscribeAnnouncesMessage{}
m = &SubscribeNamespaceMessage{}
case messageTypeSubscribeNamespaceOk:
m = &SubscribeAnnouncesOkMessage{}
m = &SubscribeNamespaceOkMessage{}
case messageTypeSubscribeNamespaceError:
m = &SubscribeAnnouncesErrorMessage{}
m = &SubscribeNamespaceErrorMessage{}
case messageTypeUnsubscribeNamespace:
m = &UnsubscribeAnnouncesMessage{}
m = &UnsubscribeNamespaceMessage{}
default:
return nil, fmt.Errorf("%w: %v", errInvalidMessageType, mt)
}
Expand Down
36 changes: 18 additions & 18 deletions internal/wire/control_message_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ const (
messageTypeSubscribeError controlMessageType = 0x05
messageTypeSubscribeUpdate controlMessageType = 0x02
messageTypeUnsubscribe controlMessageType = 0x0a
messageTypeSubscribeDone controlMessageType = 0x0b

messageTypePublishDone controlMessageType = 0x0b
messageTypePublish controlMessageType = 0x1d
messageTypePublishOk controlMessageType = 0x1e
messageTypePublishError controlMessageType = 0x1f
Expand All @@ -37,11 +37,11 @@ const (
messageTypeTrackStatusOk controlMessageType = 0x0e
messageTypeTrackStatusError controlMessageType = 0x0f

messageTypeAnnounce controlMessageType = 0x06
messageTypeAnnounceOk controlMessageType = 0x07
messageTypeAnnounceError controlMessageType = 0x08
messageTypeUnannounce controlMessageType = 0x09
messageTypeAnnounceCancel controlMessageType = 0x0c
messageTypePublishNamespace controlMessageType = 0x06
messageTypePublishNamespaceOk controlMessageType = 0x07
messageTypePublishNamespaceError controlMessageType = 0x08
messageTypePublishNamespaceDone controlMessageType = 0x09
messageTypePublishNamespaceCancel controlMessageType = 0x0c

messageTypeSubscribeNamespace controlMessageType = 0x11
messageTypeSubscribeNamespaceOk controlMessageType = 0x12
Expand Down Expand Up @@ -74,9 +74,9 @@ func (mt controlMessageType) String() string {
return "Unsubscribe"
case messageTypeSubscribeUpdate:
return "SubscribeUpdate"
case messageTypeSubscribeDone:
return "SubscribeDone"

case messageTypePublishDone:
return "PublishDone"
case messageTypePublish:
return "Publish"
case messageTypePublishOk:
Expand All @@ -100,16 +100,16 @@ func (mt controlMessageType) String() string {
case messageTypeTrackStatusError:
return "TrackStatusError"

case messageTypeAnnounce:
return "Announce"
case messageTypeAnnounceOk:
return "AnnounceOk"
case messageTypeAnnounceError:
return "AnnounceError"
case messageTypeUnannounce:
return "Unannounce"
case messageTypeAnnounceCancel:
return "AnnounceCancel"
case messageTypePublishNamespace:
return "PublishNamespace"
case messageTypePublishNamespaceOk:
return "PublishNamespaceOk"
case messageTypePublishNamespaceError:
return "PublishNamespaceError"
case messageTypePublishNamespaceDone:
return "PublishNamespaceDone"
case messageTypePublishNamespaceCancel:
return "PublishNamespaceCancel"

case messageTypeSubscribeNamespace:
return "SubscribeNamespace"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,36 +6,36 @@ import (
"github.com/quic-go/quic-go/quicvarint"
)

type SubscribeDoneMessage struct {
type PublishDoneMessage struct {
RequestID uint64
StatusCode uint64
StreamCount uint64
ReasonPhrase string
}

func (m *SubscribeDoneMessage) LogValue() slog.Value {
func (m *PublishDoneMessage) LogValue() slog.Value {
return slog.GroupValue(
slog.String("type", "subscribe_done"),
slog.String("type", "publish_done"),
slog.Uint64("request_id", m.RequestID),
slog.Uint64("status_code", m.StatusCode),
slog.Uint64("stream_count", m.StreamCount),
slog.String("reason", m.ReasonPhrase),
)
}

func (m SubscribeDoneMessage) Type() controlMessageType {
return messageTypeSubscribeDone
func (m PublishDoneMessage) Type() controlMessageType {
return messageTypePublishDone
}

func (m *SubscribeDoneMessage) Append(buf []byte) []byte {
func (m *PublishDoneMessage) Append(buf []byte) []byte {
buf = quicvarint.Append(buf, m.RequestID)
buf = quicvarint.Append(buf, m.StatusCode)
buf = quicvarint.Append(buf, m.StreamCount)
buf = appendVarIntBytes(buf, []byte(m.ReasonPhrase))
return buf
}

func (m *SubscribeDoneMessage) parse(_ Version, data []byte) (err error) {
func (m *PublishDoneMessage) parse(_ Version, data []byte) (err error) {
var n int
m.RequestID, n, err = quicvarint.Parse(data)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ import (

func TestSubscribeDoneMessageAppend(t *testing.T) {
cases := []struct {
srm SubscribeDoneMessage
srm PublishDoneMessage
buf []byte
expect []byte
}{
{
srm: SubscribeDoneMessage{
srm: PublishDoneMessage{
RequestID: 0,
StatusCode: 0,
StreamCount: 0,
Expand All @@ -25,7 +25,7 @@ func TestSubscribeDoneMessageAppend(t *testing.T) {
expect: []byte{0x00, 0x00, 0x00, 0x00},
},
{
srm: SubscribeDoneMessage{
srm: PublishDoneMessage{
RequestID: 0,
StatusCode: 1,
StreamCount: 2,
Expand All @@ -40,7 +40,7 @@ func TestSubscribeDoneMessageAppend(t *testing.T) {
},
},
{
srm: SubscribeDoneMessage{
srm: PublishDoneMessage{
RequestID: 17,
StatusCode: 1,
StreamCount: 4,
Expand All @@ -56,7 +56,7 @@ func TestSubscribeDoneMessageAppend(t *testing.T) {
},
},
{
srm: SubscribeDoneMessage{
srm: PublishDoneMessage{
RequestID: 0,
StatusCode: 0,
StreamCount: 0,
Expand All @@ -66,7 +66,7 @@ func TestSubscribeDoneMessageAppend(t *testing.T) {
expect: []byte{0x00, 0x00, 0x00, 0x00},
},
{
srm: SubscribeDoneMessage{
srm: PublishDoneMessage{
RequestID: 0,
StatusCode: 1,
StreamCount: 2,
Expand All @@ -81,7 +81,7 @@ func TestSubscribeDoneMessageAppend(t *testing.T) {
},
},
{
srm: SubscribeDoneMessage{
srm: PublishDoneMessage{
RequestID: 17,
StatusCode: 1,
StreamCount: 2,
Expand All @@ -108,24 +108,24 @@ func TestSubscribeDoneMessageAppend(t *testing.T) {
func TestParseSubscribeDoneMessage(t *testing.T) {
cases := []struct {
data []byte
expect *SubscribeDoneMessage
expect *PublishDoneMessage
err error
}{
{
data: nil,
expect: &SubscribeDoneMessage{},
expect: &PublishDoneMessage{},
err: io.EOF,
},
{
data: []byte{},
expect: &SubscribeDoneMessage{},
expect: &PublishDoneMessage{},
err: io.EOF,
},
{
data: []byte{
0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00,
},
expect: &SubscribeDoneMessage{
expect: &PublishDoneMessage{
RequestID: 0,
StatusCode: 0,
StreamCount: 0,
Expand All @@ -143,7 +143,7 @@ func TestParseSubscribeDoneMessage(t *testing.T) {
0x02,
0x03,
},
expect: &SubscribeDoneMessage{
expect: &PublishDoneMessage{
RequestID: 0,
StatusCode: 1,
StreamCount: 2,
Expand All @@ -159,7 +159,7 @@ func TestParseSubscribeDoneMessage(t *testing.T) {
0x06, 'r', 'e', 'a', 's', 'o', 'n',
0x00,
},
expect: &SubscribeDoneMessage{
expect: &PublishDoneMessage{
RequestID: 0,
StatusCode: 1,
StreamCount: 2,
Expand All @@ -171,7 +171,7 @@ func TestParseSubscribeDoneMessage(t *testing.T) {
data: []byte{
0x00, 0x00, 0x00, 0x00,
},
expect: &SubscribeDoneMessage{
expect: &PublishDoneMessage{
RequestID: 0,
StatusCode: 0,
StreamCount: 0,
Expand All @@ -182,7 +182,7 @@ func TestParseSubscribeDoneMessage(t *testing.T) {
}
for i, tc := range cases {
t.Run(fmt.Sprintf("%v", i), func(t *testing.T) {
res := &SubscribeDoneMessage{}
res := &PublishDoneMessage{}
err := res.parse(CurrentVersion, tc.data)
assert.Equal(t, tc.expect, res)
if tc.err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/wire/publish_error_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type PublishErrorMessage struct {

func (m *PublishErrorMessage) LogValue() slog.Value {
return slog.GroupValue(
slog.String("type", "subscribe_error"),
slog.String("type", "publish_error"),
slog.Uint64("request_id", m.RequestID),
slog.Uint64("error_code", m.ErrorCode),
slog.String("reason", m.ReasonPhrase),
Expand Down
Loading