Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion examples/interop/receiver.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"io"

"github.com/mengelbart/qlog"
Expand All @@ -16,7 +17,7 @@ type receiver struct {
}

func newReceiver(conn roq.Connection, qlog *qlog.Logger) (*receiver, error) {
session, err := roq.NewSession(conn, true, qlog)
session, err := roq.NewSession(context.Background(), conn, true, qlog)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion examples/interop/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type sender struct {
}

func newSender(conn roq.Connection, qlog *qlog.Logger) (*sender, error) {
session, err := roq.NewSession(conn, true, qlog)
session, err := roq.NewSession(context.Background(), conn, true, qlog)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion examples/playfromdisk/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func main() {
if err != nil {
panic(err)
}
session, err := roq.NewSession(roq.NewQUICGoConnection(conn), true, nil)
session, err := roq.NewSession(context.Background(), roq.NewQUICGoConnection(conn), true, nil)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion examples/savetodisk/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func main() {
if err != nil {
panic(err)
}
session, err := roq.NewSession(roq.NewQUICGoConnection(conn), true, nil)
session, err := roq.NewSession(context.Background(), roq.NewQUICGoConnection(conn), true, nil)
if err != nil {
panic(err)
}
Expand Down
4 changes: 2 additions & 2 deletions integrationtests/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ func accept(t *testing.T, ctx context.Context, listener *quic.Listener) *roq.Ses
conn, err := listener.Accept(ctx)
assert.NoError(t, err)
assert.NoError(t, err)
s, err := roq.NewSession(roq.NewQUICGoConnection(conn), true, nil)
s, err := roq.NewSession(context.Background(), roq.NewQUICGoConnection(conn), true, nil)
assert.NoError(t, err)
return s
}

func dial(t *testing.T, ctx context.Context, addr string) *roq.Session {
conn, err := quic.DialAddr(ctx, addr, generateTLSConfig(), &quic.Config{EnableDatagrams: true})
assert.NoError(t, err)
s, err := roq.NewSession(roq.NewQUICGoConnection(conn), true, nil)
s, err := roq.NewSession(context.Background(), roq.NewQUICGoConnection(conn), true, nil)
assert.NoError(t, err)
return s
}
Expand Down
4 changes: 2 additions & 2 deletions receive_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ type ReceiveFlow struct {
qlog *qlog.Logger
}

func newReceiveFlow(id uint64, receiveBufferSize int, qlog *qlog.Logger) *ReceiveFlow {
ctx, cancel := context.WithCancel(context.Background())
func newReceiveFlow(ctx context.Context, id uint64, receiveBufferSize int, qlog *qlog.Logger) *ReceiveFlow {
ctx, cancel := context.WithCancel(ctx)
return &ReceiveFlow{
id: id,
buffer: make(chan *bytes.Buffer, receiveBufferSize),
Expand Down
18 changes: 9 additions & 9 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,23 +54,23 @@ type Session struct {
}

// NewSession creates a new roq session. QUIC connection is handled by roq.
func NewSession(conn Connection, acceptDatagrams bool, qlogger *qlog.Logger) (*Session, error) {
s := newSession(conn, acceptDatagrams, qlogger)
func NewSession(ctx context.Context, conn Connection, acceptDatagrams bool, qlogger *qlog.Logger) (*Session, error) {
s := newSession(ctx, conn, acceptDatagrams, qlogger)
s.start()

return s, nil
}

// NewSessionWithAppHandeledConn creates a new roq session. QUIC connection is handled by application.
// HandleDatagram and HandleUniStreamWithFlowID have to be called for each datagram / new stream.
func NewSessionWithAppHandeledConn(conn Connection, acceptDatagrams bool, qlogger *qlog.Logger) (*Session, error) {
s := newSession(conn, acceptDatagrams, qlogger)
func NewSessionWithAppHandeledConn(ctx context.Context, conn Connection, acceptDatagrams bool, qlogger *qlog.Logger) (*Session, error) {
s := newSession(ctx, conn, acceptDatagrams, qlogger)

return s, nil
}

func newSession(conn Connection, acceptDatagrams bool, qlogger *qlog.Logger) *Session {
ctx, cancel := context.WithCancel(context.Background())
func newSession(ctx context.Context, conn Connection, acceptDatagrams bool, qlogger *qlog.Logger) *Session {
ctx, cancel := context.WithCancel(ctx)
return &Session{
receiveBufferSize: defaultReceiveBufferSize,
acceptDatagrams: acceptDatagrams,
Expand Down Expand Up @@ -133,7 +133,7 @@ func (s *Session) NewReceiveFlow(id uint64) (*ReceiveFlow, error) {
var f *ReceiveFlow
f = s.receiveFlowBuffer.pop(id)
if f == nil {
f = newReceiveFlow(id, s.receiveBufferSize, s.qlog)
f = newReceiveFlow(s.ctx, id, s.receiveBufferSize, s.qlog)
}
if err := s.receiveFlows.add(id, f); err != nil {
return nil, err
Expand Down Expand Up @@ -228,7 +228,7 @@ func (s *Session) HandleDatagram(datagram []byte) {
}
f := s.receiveFlowBuffer.get(flowID)
if f == nil {
f = newReceiveFlow(flowID, s.receiveBufferSize, s.qlog)
f = newReceiveFlow(s.ctx, flowID, s.receiveBufferSize, s.qlog)
s.receiveFlowBuffer.add(f)
}
b := f.bufferPool.Get().(*bytes.Buffer)
Expand All @@ -253,7 +253,7 @@ func (s *Session) HandleUniStreamWithFlowID(flowID uint64, rs ReceiveStream) {
}
f := s.receiveFlowBuffer.get(flowID)
if f == nil {
f = newReceiveFlow(flowID, s.receiveBufferSize, s.qlog)
f = newReceiveFlow(s.ctx, flowID, s.receiveBufferSize, s.qlog)
s.receiveFlowBuffer.add(f)
}
f.readStream(rs)
Expand Down
Loading