Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 23 additions & 11 deletions quic_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,51 @@ import (
"github.com/quic-go/quic-go"
)

type quicGoReceiveStream struct {
type QuicGoReceiveStream struct {
stream *quic.ReceiveStream
}

func (s *quicGoReceiveStream) ID() int64 {
func NewQuicGoReceiveStream(stream *quic.ReceiveStream) *QuicGoReceiveStream {
return &QuicGoReceiveStream{
stream: stream,
}
}

func (s *QuicGoReceiveStream) ID() int64 {
return int64(s.stream.StreamID())
}

func (s *quicGoReceiveStream) CancelRead(c uint64) {
func (s *QuicGoReceiveStream) CancelRead(c uint64) {
s.stream.CancelRead(quic.StreamErrorCode(c))
}

func (c *quicGoReceiveStream) Read(p []byte) (n int, err error) {
func (c *QuicGoReceiveStream) Read(p []byte) (n int, err error) {
return c.stream.Read(p)
}

type quicGoSendStream struct {
type QuicGoSendStream struct {
stream *quic.SendStream
}

func (s *quicGoSendStream) ID() int64 {
func NewQuicstream(stream *quic.SendStream) *QuicGoSendStream {
return &QuicGoSendStream{
stream: stream,
}
}

func (s *QuicGoSendStream) ID() int64 {
return int64(s.stream.StreamID())
}

func (s *quicGoSendStream) Write(b []byte) (int, error) {
func (s *QuicGoSendStream) Write(b []byte) (int, error) {
return s.stream.Write(b)
}

func (s *quicGoSendStream) Close() error {
func (s *QuicGoSendStream) Close() error {
return s.stream.Close()
}

func (s *quicGoSendStream) CancelWrite(c uint64) {
func (s *QuicGoSendStream) CancelWrite(c uint64) {
s.stream.CancelWrite(quic.StreamErrorCode(c))
}

Expand All @@ -65,7 +77,7 @@ func (c *QUICGoConnection) OpenUniStreamSync(ctx context.Context) (SendStream, e
if err != nil {
return nil, err
}
return &quicGoSendStream{
return &QuicGoSendStream{
stream: s,
}, nil
}
Expand All @@ -75,7 +87,7 @@ func (c *QUICGoConnection) AcceptUniStream(ctx context.Context) (ReceiveStream,
if err != nil {
return nil, err
}
return &quicGoReceiveStream{
return &QuicGoReceiveStream{
stream: s,
}, nil
}
Expand Down
49 changes: 37 additions & 12 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,25 @@ type Session struct {
qlog *qlog.Logger
}

// NewSession creates a new roq session. QUIC connection is handled by roq.
func NewSession(conn Connection, acceptDatagrams bool, qlogger *qlog.Logger) (*Session, error) {
s := newSession(conn, acceptDatagrams, qlogger)
s.start()

return s, nil
}

// NewSessionWithAppHandeledConn creates a new roq session. QUIC connection is handled by application.
// HandleDatagram and HandleUniStreamWithFlowID have to be called for each datagram / new stream.
func NewSessionWithAppHandeledConn(conn Connection, acceptDatagrams bool, qlogger *qlog.Logger) (*Session, error) {
s := newSession(conn, acceptDatagrams, qlogger)

return s, nil
}

func newSession(conn Connection, acceptDatagrams bool, qlogger *qlog.Logger) *Session {
ctx, cancel := context.WithCancel(context.Background())
s := &Session{
return &Session{
receiveBufferSize: defaultReceiveBufferSize,
acceptDatagrams: acceptDatagrams,
conn: conn,
Expand All @@ -69,8 +85,6 @@ func NewSession(conn Connection, acceptDatagrams bool, qlogger *qlog.Logger) (*S
cancelCtx: cancel,
qlog: qlogger,
}
s.start()
return s, nil
}

func (s *Session) start() {
Expand Down Expand Up @@ -176,11 +190,14 @@ func (s *Session) receiveDatagrams() error {
if err != nil {
return err
}
s.handleDatagram(dgram)
s.HandleDatagram(dgram)
}
}

func (s *Session) handleDatagram(datagram []byte) {
// HandleUniStreamWithFlowID handles a datagram.
// If QUIC connection is handled by the application, this function has to be called by the application
// for each datagram that belongs to belongs to the roq connnection.
func (s *Session) HandleDatagram(datagram []byte) {
flowID, n, err := quicvarint.Parse(datagram)
if err != nil {
s.closeWithError(ErrRoQPacketError, "invalid flow ID")
Expand Down Expand Up @@ -220,13 +237,10 @@ func (s *Session) handleDatagram(datagram []byte) {
f.push(b)
}

func (s *Session) handleUniStream(rs ReceiveStream) {
reader := quicvarint.NewReader(rs)
flowID, err := quicvarint.Read(reader)
if err != nil {
s.closeWithError(ErrRoQPacketError, "invalid flow ID")
return
}
// HandleUniStreamWithFlowID handles a new flow with the flowID allready parsed.
// If QUIC connection is handled by the application, this function has to be called by the application
// for each new QUIC stream containing a roq floqID.
func (s *Session) HandleUniStreamWithFlowID(flowID uint64, rs ReceiveStream) {
if s.qlog != nil {
s.qlog.Log(roqqlog.StreamOpenedEvent{
FlowID: flowID,
Expand All @@ -244,3 +258,14 @@ func (s *Session) handleUniStream(rs ReceiveStream) {
}
f.readStream(rs)
}

func (s *Session) handleUniStream(rs ReceiveStream) {
reader := quicvarint.NewReader(rs)
flowID, err := quicvarint.Read(reader)
if err != nil {
s.closeWithError(ErrRoQPacketError, "invalid flow ID")
return
}

s.HandleUniStreamWithFlowID(flowID, rs)
}