diff --git a/quic_connection.go b/quic_connection.go index 38c7255..7db4899 100644 --- a/quic_connection.go +++ b/quic_connection.go @@ -6,39 +6,51 @@ import ( "github.com/quic-go/quic-go" ) -type quicGoReceiveStream struct { +type QuicGoReceiveStream struct { stream *quic.ReceiveStream } -func (s *quicGoReceiveStream) ID() int64 { +func NewQuicGoReceiveStream(stream *quic.ReceiveStream) *QuicGoReceiveStream { + return &QuicGoReceiveStream{ + stream: stream, + } +} + +func (s *QuicGoReceiveStream) ID() int64 { return int64(s.stream.StreamID()) } -func (s *quicGoReceiveStream) CancelRead(c uint64) { +func (s *QuicGoReceiveStream) CancelRead(c uint64) { s.stream.CancelRead(quic.StreamErrorCode(c)) } -func (c *quicGoReceiveStream) Read(p []byte) (n int, err error) { +func (c *QuicGoReceiveStream) Read(p []byte) (n int, err error) { return c.stream.Read(p) } -type quicGoSendStream struct { +type QuicGoSendStream struct { stream *quic.SendStream } -func (s *quicGoSendStream) ID() int64 { +func NewQuicstream(stream *quic.SendStream) *QuicGoSendStream { + return &QuicGoSendStream{ + stream: stream, + } +} + +func (s *QuicGoSendStream) ID() int64 { return int64(s.stream.StreamID()) } -func (s *quicGoSendStream) Write(b []byte) (int, error) { +func (s *QuicGoSendStream) Write(b []byte) (int, error) { return s.stream.Write(b) } -func (s *quicGoSendStream) Close() error { +func (s *QuicGoSendStream) Close() error { return s.stream.Close() } -func (s *quicGoSendStream) CancelWrite(c uint64) { +func (s *QuicGoSendStream) CancelWrite(c uint64) { s.stream.CancelWrite(quic.StreamErrorCode(c)) } @@ -65,7 +77,7 @@ func (c *QUICGoConnection) OpenUniStreamSync(ctx context.Context) (SendStream, e if err != nil { return nil, err } - return &quicGoSendStream{ + return &QuicGoSendStream{ stream: s, }, nil } @@ -75,7 +87,7 @@ func (c *QUICGoConnection) AcceptUniStream(ctx context.Context) (ReceiveStream, if err != nil { return nil, err } - return &quicGoReceiveStream{ + return &QuicGoReceiveStream{ stream: s, }, nil } diff --git a/session.go b/session.go index 62f7576..f17e2a7 100644 --- a/session.go +++ b/session.go @@ -53,9 +53,25 @@ type Session struct { qlog *qlog.Logger } +// NewSession creates a new roq session. QUIC connection is handled by roq. func NewSession(conn Connection, acceptDatagrams bool, qlogger *qlog.Logger) (*Session, error) { + s := newSession(conn, acceptDatagrams, qlogger) + s.start() + + return s, nil +} + +// NewSessionWithAppHandeledConn creates a new roq session. QUIC connection is handled by application. +// HandleDatagram and HandleUniStreamWithFlowID have to be called for each datagram / new stream. +func NewSessionWithAppHandeledConn(conn Connection, acceptDatagrams bool, qlogger *qlog.Logger) (*Session, error) { + s := newSession(conn, acceptDatagrams, qlogger) + + return s, nil +} + +func newSession(conn Connection, acceptDatagrams bool, qlogger *qlog.Logger) *Session { ctx, cancel := context.WithCancel(context.Background()) - s := &Session{ + return &Session{ receiveBufferSize: defaultReceiveBufferSize, acceptDatagrams: acceptDatagrams, conn: conn, @@ -69,8 +85,6 @@ func NewSession(conn Connection, acceptDatagrams bool, qlogger *qlog.Logger) (*S cancelCtx: cancel, qlog: qlogger, } - s.start() - return s, nil } func (s *Session) start() { @@ -176,11 +190,14 @@ func (s *Session) receiveDatagrams() error { if err != nil { return err } - s.handleDatagram(dgram) + s.HandleDatagram(dgram) } } -func (s *Session) handleDatagram(datagram []byte) { +// HandleUniStreamWithFlowID handles a datagram. +// If QUIC connection is handled by the application, this function has to be called by the application +// for each datagram that belongs to belongs to the roq connnection. +func (s *Session) HandleDatagram(datagram []byte) { flowID, n, err := quicvarint.Parse(datagram) if err != nil { s.closeWithError(ErrRoQPacketError, "invalid flow ID") @@ -220,13 +237,10 @@ func (s *Session) handleDatagram(datagram []byte) { f.push(b) } -func (s *Session) handleUniStream(rs ReceiveStream) { - reader := quicvarint.NewReader(rs) - flowID, err := quicvarint.Read(reader) - if err != nil { - s.closeWithError(ErrRoQPacketError, "invalid flow ID") - return - } +// HandleUniStreamWithFlowID handles a new flow with the flowID allready parsed. +// If QUIC connection is handled by the application, this function has to be called by the application +// for each new QUIC stream containing a roq floqID. +func (s *Session) HandleUniStreamWithFlowID(flowID uint64, rs ReceiveStream) { if s.qlog != nil { s.qlog.Log(roqqlog.StreamOpenedEvent{ FlowID: flowID, @@ -244,3 +258,14 @@ func (s *Session) handleUniStream(rs ReceiveStream) { } f.readStream(rs) } + +func (s *Session) handleUniStream(rs ReceiveStream) { + reader := quicvarint.NewReader(rs) + flowID, err := quicvarint.Read(reader) + if err != nil { + s.closeWithError(ErrRoQPacketError, "invalid flow ID") + return + } + + s.HandleUniStreamWithFlowID(flowID, rs) +}