Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/date/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ func (p *publisher) SendDatagram(o moqtransport.Object) error {
return p.p.SendDatagram(o)
}

func (p *publisher) OpenSubgroup(groupID, subgroupID uint64, priority uint8) (*moqtransport.Subgroup, error) {
func (p *publisher) OpenSubgroup(groupID, subgroupID uint64, priority uint8, opts ...moqtransport.SubgroupOption) (*moqtransport.Subgroup, error) {
log.Printf("sessionNr: %d, requestID: %d, groupID: %d, subgroupID: %v",
p.sessionID, p.requestID, groupID, subgroupID)
return p.p.OpenSubgroup(groupID, subgroupID, priority)
return p.p.OpenSubgroup(groupID, subgroupID, priority, opts...)
}

func (p *publisher) CloseWithError(code uint64, reason string) error {
Expand Down
18 changes: 17 additions & 1 deletion handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,29 @@ type ResponseWriter interface {
Reject(code uint64, reason string) error
}

// SubgroupOption configures optional subgroup properties.
type SubgroupOption func(*subgroupOptions)

type subgroupOptions struct {
endOfGroup bool
}

// WithEndOfGroup signals that this subgroup stream will contain the last
// object in the group. Per draft-14, this sets the "Contains End of Group"
// bit in the SUBGROUP_HEADER stream type.
func WithEndOfGroup() SubgroupOption {
return func(o *subgroupOptions) {
o.endOfGroup = true
}
}

// Publisher is the interface implemented by SubscribeResponseWriters
type Publisher interface {
// SendDatagram sends an object in a datagram.
SendDatagram(Object) error

// OpenSubgroup opens and returns a new subgroup.
OpenSubgroup(groupID, subgroupID uint64, priority uint8) (*Subgroup, error)
OpenSubgroup(groupID, subgroupID uint64, priority uint8, opts ...SubgroupOption) (*Subgroup, error)

// CloseWithError closes the track and sends SUBSCRIBE_DONE with code and
// reason.
Expand Down
30 changes: 0 additions & 30 deletions internal/wire/announce_ok_message.go

This file was deleted.

8 changes: 4 additions & 4 deletions internal/wire/client_setup_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@ func TestClientSetupMessageAppend(t *testing.T) {
},
{
csm: ClientSetupMessage{
SupportedVersions: []Version{Draft_ietf_moq_transport_00},
SupportedVersions: []Version{CurrentVersion},
SetupParameters: KVPList{},
},
buf: []byte{},
expect: []byte{
0x01, 0xc0, 0x00, 0x00, 0x00, 0xff, 0x00, 0x00, 0x00, 0x00,
0x01, 0xc0, 0x00, 0x00, 0x00, 0xff, 0x00, 0x00, 0x0e, 0x00,
},
},
{
csm: ClientSetupMessage{
SupportedVersions: []Version{Draft_ietf_moq_transport_00},
SupportedVersions: []Version{CurrentVersion},
SetupParameters: KVPList{
KeyValuePair{
Type: PathParameterKey,
Expand All @@ -46,7 +46,7 @@ func TestClientSetupMessageAppend(t *testing.T) {
},
buf: []byte{},
expect: []byte{
0x01, 0xc0, 0x00, 0x00, 0x00, 0xff, 0x00, 0x00, 0x00, 0x01, 0x01, 0x01, 'A',
0x01, 0xc0, 0x00, 0x00, 0x00, 0xff, 0x00, 0x00, 0x0e, 0x01, 0x01, 0x01, 'A',
},
},
}
Expand Down
36 changes: 18 additions & 18 deletions internal/wire/control_message_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ func (p *ControlMessageParser) Parse() (ControlMessage, error) {
m = &UnsubscribeMessage{}
case messageTypeSubscribeUpdate:
m = &SubscribeUpdateMessage{}
case messageTypeSubscribeDone:
m = &SubscribeDoneMessage{}
case messageTypePublishDone:
m = &PublishDoneMessage{}

case messageTypeFetch:
m = &FetchMessage{}
Expand All @@ -80,29 +80,29 @@ func (p *ControlMessageParser) Parse() (ControlMessage, error) {
m = &FetchCancelMessage{}

case messageTypeTrackStatus:
m = &TrackStatusRequestMessage{}
case messageTypeTrackStatusOk:
m = &TrackStatusMessage{}
case messageTypeTrackStatusOk:
m = &TrackStatusOkMessage{}

case messageTypeAnnounce:
m = &AnnounceMessage{}
case messageTypeAnnounceOk:
m = &AnnounceOkMessage{}
case messageTypeAnnounceError:
m = &AnnounceErrorMessage{}
case messageTypeUnannounce:
m = &UnannounceMessage{}
case messageTypeAnnounceCancel:
m = &AnnounceCancelMessage{}
case messageTypePublishNamespace:
m = &PublishNamespaceMessage{}
case messageTypePublishNamespaceOk:
m = &PublishNamespaceOkMessage{}
case messageTypePublishNamespaceError:
m = &PublishNamespaceErrorMessage{}
case messageTypePublishNamespaceDone:
m = &PublishNamespaceDoneMessage{}
case messageTypePublishNamespaceCancel:
m = &PublishNamespaceCancelMessage{}

case messageTypeSubscribeNamespace:
m = &SubscribeAnnouncesMessage{}
m = &SubscribeNamespaceMessage{}
case messageTypeSubscribeNamespaceOk:
m = &SubscribeAnnouncesOkMessage{}
m = &SubscribeNamespaceOkMessage{}
case messageTypeSubscribeNamespaceError:
m = &SubscribeAnnouncesErrorMessage{}
m = &SubscribeNamespaceErrorMessage{}
case messageTypeUnsubscribeNamespace:
m = &UnsubscribeAnnouncesMessage{}
m = &UnsubscribeNamespaceMessage{}
default:
return nil, fmt.Errorf("%w: %v", errInvalidMessageType, mt)
}
Expand Down
36 changes: 18 additions & 18 deletions internal/wire/control_message_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ const (
messageTypeSubscribeError controlMessageType = 0x05
messageTypeSubscribeUpdate controlMessageType = 0x02
messageTypeUnsubscribe controlMessageType = 0x0a
messageTypeSubscribeDone controlMessageType = 0x0b

messageTypePublishDone controlMessageType = 0x0b
messageTypePublish controlMessageType = 0x1d
messageTypePublishOk controlMessageType = 0x1e
messageTypePublishError controlMessageType = 0x1f
Expand All @@ -37,11 +37,11 @@ const (
messageTypeTrackStatusOk controlMessageType = 0x0e
messageTypeTrackStatusError controlMessageType = 0x0f

messageTypeAnnounce controlMessageType = 0x06
messageTypeAnnounceOk controlMessageType = 0x07
messageTypeAnnounceError controlMessageType = 0x08
messageTypeUnannounce controlMessageType = 0x09
messageTypeAnnounceCancel controlMessageType = 0x0c
messageTypePublishNamespace controlMessageType = 0x06
messageTypePublishNamespaceOk controlMessageType = 0x07
messageTypePublishNamespaceError controlMessageType = 0x08
messageTypePublishNamespaceDone controlMessageType = 0x09
messageTypePublishNamespaceCancel controlMessageType = 0x0c

messageTypeSubscribeNamespace controlMessageType = 0x11
messageTypeSubscribeNamespaceOk controlMessageType = 0x12
Expand Down Expand Up @@ -74,9 +74,9 @@ func (mt controlMessageType) String() string {
return "Unsubscribe"
case messageTypeSubscribeUpdate:
return "SubscribeUpdate"
case messageTypeSubscribeDone:
return "SubscribeDone"

case messageTypePublishDone:
return "PublishDone"
case messageTypePublish:
return "Publish"
case messageTypePublishOk:
Expand All @@ -100,16 +100,16 @@ func (mt controlMessageType) String() string {
case messageTypeTrackStatusError:
return "TrackStatusError"

case messageTypeAnnounce:
return "Announce"
case messageTypeAnnounceOk:
return "AnnounceOk"
case messageTypeAnnounceError:
return "AnnounceError"
case messageTypeUnannounce:
return "Unannounce"
case messageTypeAnnounceCancel:
return "AnnounceCancel"
case messageTypePublishNamespace:
return "PublishNamespace"
case messageTypePublishNamespaceOk:
return "PublishNamespaceOk"
case messageTypePublishNamespaceError:
return "PublishNamespaceError"
case messageTypePublishNamespaceDone:
return "PublishNamespaceDone"
case messageTypePublishNamespaceCancel:
return "PublishNamespaceCancel"

case messageTypeSubscribeNamespace:
return "SubscribeNamespace"
Expand Down
58 changes: 44 additions & 14 deletions internal/wire/object_stream_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,47 @@ import (
type StreamType uint64

const (
StreamTypeFetch StreamType = 0x05
StreamTypeSubgroupZeroSIDNoExt StreamType = 0x08
StreamTypeSubgroupZeroSIDExt StreamType = 0x09
StreamTypeSubgroupNoSIDNoExt StreamType = 0x0a
StreamTypeSubgroupNoSIDExt StreamType = 0x0b
StreamTypeSubgroupSIDNoExt StreamType = 0x0c
StreamTypeSubgroupSIDExt StreamType = 0x0d
StreamTypeFetch StreamType = 0x05

// Subgroup header types without End of Group (0x10-0x15)
StreamTypeSubgroupZeroSIDNoExt StreamType = 0x10
StreamTypeSubgroupZeroSIDExt StreamType = 0x11
StreamTypeSubgroupNoSIDNoExt StreamType = 0x12
StreamTypeSubgroupNoSIDExt StreamType = 0x13
StreamTypeSubgroupSIDNoExt StreamType = 0x14
StreamTypeSubgroupSIDExt StreamType = 0x15

// Subgroup header types with End of Group (0x18-0x1D)
StreamTypeSubgroupZeroSIDNoExtEOG StreamType = 0x18
StreamTypeSubgroupZeroSIDExtEOG StreamType = 0x19
StreamTypeSubgroupNoSIDNoExtEOG StreamType = 0x1A
StreamTypeSubgroupNoSIDExtEOG StreamType = 0x1B
StreamTypeSubgroupSIDNoExtEOG StreamType = 0x1C
StreamTypeSubgroupSIDExtEOG StreamType = 0x1D
)

var (
errInvalidStreamType = errors.New("invalid stream type")
)

func isSubgroupStreamType(st StreamType) bool {
return (st >= 0x10 && st <= 0x15) || (st >= 0x18 && st <= 0x1D)
}

func subgroupHasExplicitSID(st StreamType) bool {
return st == StreamTypeSubgroupSIDNoExt || st == StreamTypeSubgroupSIDExt ||
st == StreamTypeSubgroupSIDNoExtEOG || st == StreamTypeSubgroupSIDExtEOG
}

func subgroupSIDIsFirstObjectID(st StreamType) bool {
return st == StreamTypeSubgroupNoSIDNoExt || st == StreamTypeSubgroupNoSIDExt ||
st == StreamTypeSubgroupNoSIDNoExtEOG || st == StreamTypeSubgroupNoSIDExtEOG
}

func subgroupContainsEndOfGroup(st StreamType) bool {
return st >= 0x18 && st <= 0x1D
}

type ObjectStreamParser struct {
qlogger *qlog.Logger
streamID uint64
Expand All @@ -42,6 +70,7 @@ type ObjectStreamParser struct {
PublisherPriority uint8
GroupID uint64
SubgroupID uint64
EndOfGroup bool
}

func (p *ObjectStreamParser) Type() StreamType {
Expand Down Expand Up @@ -83,7 +112,7 @@ func NewObjectStreamParser(r io.Reader, streamID uint64, qlogger *qlog.Logger) (
SubgroupID: 0,
}, nil
}
if streamType >= 0x08 && streamType <= 0x0d {
if isSubgroupStreamType(streamType) {
if qlogger != nil {
qlogger.Log(moqt.StreamTypeSetEvent{
Owner: moqt.GetOwner(moqt.OwnerRemote),
Expand All @@ -95,9 +124,9 @@ func NewObjectStreamParser(r io.Reader, streamID uint64, qlogger *qlog.Logger) (
// objects
ext := streamType&0x01 > 0

// Only read subgroup ID from header if type is 0x0c or 0x0d. In all
// other cases, it is either zero or will be read from the first object.
sid := streamType == 0x0c || streamType == 0x0d
// Only read subgroup ID from header if type has explicit SID field.
// In all other cases, it is either zero or will be read from the first object.
sid := subgroupHasExplicitSID(streamType)

var shsm SubgroupHeaderMessage
if err := shsm.parse(br, sid); err != nil {
Expand All @@ -109,13 +138,14 @@ func NewObjectStreamParser(r io.Reader, streamID uint64, qlogger *qlog.Logger) (
reader: br,
typ: streamType,
identifier: shsm.TrackAlias,
// if stream type is 0x0a or 0x0b, we don't yet know the subgroup ID
// if subgroup ID comes from first object ID, we don't yet know it
// because it will only be read when the first object is parsed.
hasSubgroupID: streamType != 0x0a && streamType != 0x0b,
hasSubgroupID: !subgroupSIDIsFirstObjectID(streamType),
hasExtensions: ext,
PublisherPriority: shsm.PublisherPriority,
GroupID: shsm.GroupID,
SubgroupID: shsm.SubgroupID,
EndOfGroup: subgroupContainsEndOfGroup(streamType),
}, nil
}
return nil, fmt.Errorf("%w: %v", errInvalidStreamType, st)
Expand Down Expand Up @@ -239,7 +269,7 @@ func (p *ObjectStreamParser) Parse() (*ObjectMessage, error) {
if p.typ == StreamTypeFetch {
return p.parseFetchObject()
}
if p.typ >= 0x08 && p.typ <= 0x0d {
if isSubgroupStreamType(p.typ) {
return p.parseSubgroupObject()
}
return nil, errInvalidStreamType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,36 +6,36 @@ import (
"github.com/quic-go/quic-go/quicvarint"
)

type SubscribeDoneMessage struct {
type PublishDoneMessage struct {
RequestID uint64
StatusCode uint64
StreamCount uint64
ReasonPhrase string
}

func (m *SubscribeDoneMessage) LogValue() slog.Value {
func (m *PublishDoneMessage) LogValue() slog.Value {
return slog.GroupValue(
slog.String("type", "subscribe_done"),
slog.String("type", "publish_done"),
slog.Uint64("request_id", m.RequestID),
slog.Uint64("status_code", m.StatusCode),
slog.Uint64("stream_count", m.StreamCount),
slog.String("reason", m.ReasonPhrase),
)
}

func (m SubscribeDoneMessage) Type() controlMessageType {
return messageTypeSubscribeDone
func (m PublishDoneMessage) Type() controlMessageType {
return messageTypePublishDone
}

func (m *SubscribeDoneMessage) Append(buf []byte) []byte {
func (m *PublishDoneMessage) Append(buf []byte) []byte {
buf = quicvarint.Append(buf, m.RequestID)
buf = quicvarint.Append(buf, m.StatusCode)
buf = quicvarint.Append(buf, m.StreamCount)
buf = appendVarIntBytes(buf, []byte(m.ReasonPhrase))
return buf
}

func (m *SubscribeDoneMessage) parse(_ Version, data []byte) (err error) {
func (m *PublishDoneMessage) parse(_ Version, data []byte) (err error) {
var n int
m.RequestID, n, err = quicvarint.Parse(data)
if err != nil {
Expand Down
Loading
Loading