Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions announcement_response_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package moqtransport

type announcementResponseWriter struct {
namespace []string
transport *Transport
session *Session
}

func (a *announcementResponseWriter) Accept() error {
return a.transport.acceptAnnouncement(a.namespace)
return a.session.acceptAnnouncement(a.namespace)
}

func (a *announcementResponseWriter) Reject(code uint64, reason string) error {
return a.transport.rejectAnnouncement(a.namespace, code, reason)
return a.session.rejectAnnouncement(a.namespace, code, reason)
}
8 changes: 4 additions & 4 deletions announcement_subscription_response_writer.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package moqtransport

type announcementSubscriptionResponseWriter struct {
prefix []string
transport *Transport
prefix []string
session *Session
}

func (a *announcementSubscriptionResponseWriter) Accept() error {
return a.transport.acceptAnnouncementSubscription(a.prefix)
return a.session.acceptAnnouncementSubscription(a.prefix)
}

func (a *announcementSubscriptionResponseWriter) Reject(code uint64, reason string) error {
return a.transport.rejectAnnouncementSubscription(a.prefix, code, reason)
return a.session.rejectAnnouncementSubscription(a.prefix, code, reason)
}
49 changes: 0 additions & 49 deletions callbacks.go

This file was deleted.

109 changes: 109 additions & 0 deletions control_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package moqtransport

import (
"context"
"io"
"log/slog"

"github.com/mengelbart/moqtransport/internal/wire"
)

type controlMessageReceiver interface {
receive(wire.ControlMessage) error
}

type controlMessageParser interface {
Parse() (wire.ControlMessage, error)
}

func newControlMessageParser(r io.Reader) controlMessageParser {
return wire.NewControlMessageParser(r)
}

type controlMessageSender interface {
QueueControlMessage(wire.ControlMessage) error
close(err error)
}

type controlStream struct {
logger *slog.Logger
ctx context.Context
cancelCtx context.CancelCauseFunc
queue chan wire.ControlMessage
}

func newControlStream() *controlStream {
ctx, cancel := context.WithCancelCause(context.Background())
cs := &controlStream{
ctx: ctx,
cancelCtx: cancel,
queue: make(chan wire.ControlMessage, 100),
}
return cs
}

func (s *controlStream) accept(conn Connection, receiver controlMessageReceiver) {
stream, err := conn.AcceptStream(s.ctx)
if err != nil {
s.close(err)
return
}
s.logger = defaultLogger.With("perspective", conn.Perspective())
go s.sendLoop(stream)
go s.receiveLoop(newControlMessageParser(stream), receiver)
}

func (s *controlStream) open(conn Connection, receiver controlMessageReceiver) {
stream, err := conn.OpenStreamSync(s.ctx)
if err != nil {
// TODO: close transport and session?
panic(err)
}
s.logger = defaultLogger.With("perspective", conn.Perspective())
go s.sendLoop(stream)
go s.receiveLoop(newControlMessageParser(stream), receiver)
}

// queueControlMessage implements controlMessageSender.
func (s *controlStream) QueueControlMessage(msg wire.ControlMessage) error {
select {
case <-s.ctx.Done():
return context.Cause(s.ctx)
case s.queue <- msg:
return nil
default:
return ErrControlMessageQueueOverflow
}
}

func (s *controlStream) sendLoop(writer io.Writer) {
for {
select {
case <-s.ctx.Done():
return
case msg := <-s.queue:
s.logger.Info("sending message", "type", msg.Type().String(), "msg", msg)
_, err := writer.Write(compileMessage(msg))
if err != nil {
s.cancelCtx(err)
return
}
}
}
}

func (s *controlStream) receiveLoop(parser controlMessageParser, receiver controlMessageReceiver) {
for {
msg, err := parser.Parse()
if err != nil {
return
}
if err = receiver.receive(msg); err != nil {
return
}
}
}

func (s *controlStream) close(err error) {
s.cancelCtx(err)
}
82 changes: 41 additions & 41 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,69 +2,69 @@ package moqtransport

// Generic error codes
const (
ErrorCodeNoError = 0x00
ErrorCodeInternal = 0x01
ErrorCodeUnauthorized = 0x02
ErrorCodeProtocolViolation = 0x03
ErrorCodeDuplicateTrackAlias = 0x04
ErrorCodeParameterLengthMismatch = 0x05
ErrorCodeTooManySubscribes = 0x06
ErrorCodeGoAwayTimeout = 0x10
ErrorCodeControlMessageTimeout = 0x11
ErrorCodeDataStreamTimeout = 0x12
ErrorCodeNoError uint64 = 0x00
ErrorCodeInternal uint64 = 0x01
ErrorCodeUnauthorized uint64 = 0x02
ErrorCodeProtocolViolation uint64 = 0x03
ErrorCodeDuplicateTrackAlias uint64 = 0x04
ErrorCodeParameterLengthMismatch uint64 = 0x05
ErrorCodeTooManySubscribes uint64 = 0x06
ErrorCodeGoAwayTimeout uint64 = 0x10
ErrorCodeControlMessageTimeout uint64 = 0x11
ErrorCodeDataStreamTimeout uint64 = 0x12

// Errors not included in current draft
ErrorCodeUnsupportedVersion = 0xff01
ErrorCodeUnsupportedVersion uint64 = 0xff01
)

// Announcement error codes
const (
ErrorCodeAnnouncementInternalError = 0x00
ErrorCodeAnnouncementUnauthorized = 0x01
ErrorCodeAnnouncementTimeout = 0x02
ErrorCodeAnnouncementNotSupported = 0x03
ErrorCodeAnnouncementUninterested = 0x04
ErrorCodeAnnouncementInternalError uint64 = 0x00
ErrorCodeAnnouncementUnauthorized uint64 = 0x01
ErrorCodeAnnouncementTimeout uint64 = 0x02
ErrorCodeAnnouncementNotSupported uint64 = 0x03
ErrorCodeAnnouncementUninterested uint64 = 0x04
)

// Subscribe error codes
const (
ErrorCodeSubscribeInternal = 0x00
ErrorCodeSubscribeUnauthorized = 0x01
ErrorCodeSubscribeTimeout = 0x02
ErrorCodeSubscribeNotSupported = 0x03
ErrorCodeSubscribeTrackDoesNotExist = 0x04
ErrorCodeSubscribeInvalidRange = 0x05
ErrorCodeSubscribeRetryTrackAlias = 0x06
ErrorCodeSubscribeInternal uint64 = 0x00
ErrorCodeSubscribeUnauthorized uint64 = 0x01
ErrorCodeSubscribeTimeout uint64 = 0x02
ErrorCodeSubscribeNotSupported uint64 = 0x03
ErrorCodeSubscribeTrackDoesNotExist uint64 = 0x04
ErrorCodeSubscribeInvalidRange uint64 = 0x05
ErrorCodeSubscribeRetryTrackAlias uint64 = 0x06
)

// Fetch error codes
const (
ErrorCodeFetchInternalError = 0x00
ErrorCodeFetchUnauthorized = 0x01
ErrorCodeFetchTimeout = 0x02
ErrorCodeFetchNotSupported = 0x03
ErrorCodeFetchTrackDoesNotExist = 0x04
ErrorCodeFetchInvalidRange = 0x05
ErrorCodeFetchInternalError uint64 = 0x00
ErrorCodeFetchUnauthorized uint64 = 0x01
ErrorCodeFetchTimeout uint64 = 0x02
ErrorCodeFetchNotSupported uint64 = 0x03
ErrorCodeFetchTrackDoesNotExist uint64 = 0x04
ErrorCodeFetchInvalidRange uint64 = 0x05
)

// Subscribe done error codes
const (
ErrorCodeSubscribeDoneInternalError = 0x00
ErrorCodeSubscribeDoneUnauthorized = 0x01
ErrorCodeSubscribeDoneTrackEnded = 0x02
ErrorCodeSubscribeDoneSubscriptionEnded = 0x03
ErrorCodeSubscribeDoneGoingAway = 0x04
ErrorCodeSubscribeDoneExpired = 0x05
ErrorCodeSubscribeDoneTooFarBehind = 0x06
ErrorCodeSubscribeDoneInternalError uint64 = 0x00
ErrorCodeSubscribeDoneUnauthorized uint64 = 0x01
ErrorCodeSubscribeDoneTrackEnded uint64 = 0x02
ErrorCodeSubscribeDoneSubscriptionEnded uint64 = 0x03
ErrorCodeSubscribeDoneGoingAway uint64 = 0x04
ErrorCodeSubscribeDoneExpired uint64 = 0x05
ErrorCodeSubscribeDoneTooFarBehind uint64 = 0x06
)

// Subscribe Announces error codes
const (
ErrorCodeSubscribeAnnouncesInternalError = 0x00
ErrorCodeSubscribeAnnouncesUnauthorized = 0x01
ErrorCodeSubscribeAnnouncesTimeout = 0x02
ErrorCodeSubscribeAnnouncesNotSupported = 0x03
ErrorCodeSubscribeAnnouncesNamespacePrefixUnknown = 0x04
ErrorCodeSubscribeAnnouncesInternalError uint64 = 0x00
ErrorCodeSubscribeAnnouncesUnauthorized uint64 = 0x01
ErrorCodeSubscribeAnnouncesTimeout uint64 = 0x02
ErrorCodeSubscribeAnnouncesNotSupported uint64 = 0x03
ErrorCodeSubscribeAnnouncesNamespacePrefixUnknown uint64 = 0x04
)

// ProtocolError is a MoQ protocol error
Expand Down
Binary file added examples/date/date
Binary file not shown.
Loading