diff --git a/examples/interop/receiver.go b/examples/interop/receiver.go index f16e6de..7925b9a 100644 --- a/examples/interop/receiver.go +++ b/examples/interop/receiver.go @@ -1,6 +1,7 @@ package main import ( + "context" "io" "github.com/mengelbart/qlog" @@ -16,7 +17,7 @@ type receiver struct { } func newReceiver(conn roq.Connection, qlog *qlog.Logger) (*receiver, error) { - session, err := roq.NewSession(conn, true, qlog) + session, err := roq.NewSession(context.Background(), conn, true, qlog) if err != nil { return nil, err } diff --git a/examples/interop/sender.go b/examples/interop/sender.go index 87fe1d7..4e05991 100644 --- a/examples/interop/sender.go +++ b/examples/interop/sender.go @@ -26,7 +26,7 @@ type sender struct { } func newSender(conn roq.Connection, qlog *qlog.Logger) (*sender, error) { - session, err := roq.NewSession(conn, true, qlog) + session, err := roq.NewSession(context.Background(), conn, true, qlog) if err != nil { return nil, err } diff --git a/examples/playfromdisk/main.go b/examples/playfromdisk/main.go index 57cdd1e..fe43e28 100644 --- a/examples/playfromdisk/main.go +++ b/examples/playfromdisk/main.go @@ -81,7 +81,7 @@ func main() { if err != nil { panic(err) } - session, err := roq.NewSession(roq.NewQUICGoConnection(conn), true, nil) + session, err := roq.NewSession(context.Background(), roq.NewQUICGoConnection(conn), true, nil) if err != nil { panic(err) } diff --git a/examples/savetodisk/main.go b/examples/savetodisk/main.go index 61c396a..fe39466 100644 --- a/examples/savetodisk/main.go +++ b/examples/savetodisk/main.go @@ -34,7 +34,7 @@ func main() { if err != nil { panic(err) } - session, err := roq.NewSession(roq.NewQUICGoConnection(conn), true, nil) + session, err := roq.NewSession(context.Background(), roq.NewQUICGoConnection(conn), true, nil) if err != nil { panic(err) } diff --git a/integrationtests/integration_test.go b/integrationtests/integration_test.go index 0a9a139..f5840b7 100644 --- a/integrationtests/integration_test.go +++ b/integrationtests/integration_test.go @@ -26,7 +26,7 @@ func accept(t *testing.T, ctx context.Context, listener *quic.Listener) *roq.Ses conn, err := listener.Accept(ctx) assert.NoError(t, err) assert.NoError(t, err) - s, err := roq.NewSession(roq.NewQUICGoConnection(conn), true, nil) + s, err := roq.NewSession(context.Background(), roq.NewQUICGoConnection(conn), true, nil) assert.NoError(t, err) return s } @@ -34,7 +34,7 @@ func accept(t *testing.T, ctx context.Context, listener *quic.Listener) *roq.Ses func dial(t *testing.T, ctx context.Context, addr string) *roq.Session { conn, err := quic.DialAddr(ctx, addr, generateTLSConfig(), &quic.Config{EnableDatagrams: true}) assert.NoError(t, err) - s, err := roq.NewSession(roq.NewQUICGoConnection(conn), true, nil) + s, err := roq.NewSession(context.Background(), roq.NewQUICGoConnection(conn), true, nil) assert.NoError(t, err) return s } diff --git a/receive_flow.go b/receive_flow.go index 62c83c2..e0b095c 100644 --- a/receive_flow.go +++ b/receive_flow.go @@ -25,8 +25,8 @@ type ReceiveFlow struct { qlog *qlog.Logger } -func newReceiveFlow(id uint64, receiveBufferSize int, qlog *qlog.Logger) *ReceiveFlow { - ctx, cancel := context.WithCancel(context.Background()) +func newReceiveFlow(ctx context.Context, id uint64, receiveBufferSize int, qlog *qlog.Logger) *ReceiveFlow { + ctx, cancel := context.WithCancel(ctx) return &ReceiveFlow{ id: id, buffer: make(chan *bytes.Buffer, receiveBufferSize), diff --git a/session.go b/session.go index f17e2a7..4b10746 100644 --- a/session.go +++ b/session.go @@ -54,8 +54,8 @@ type Session struct { } // NewSession creates a new roq session. QUIC connection is handled by roq. -func NewSession(conn Connection, acceptDatagrams bool, qlogger *qlog.Logger) (*Session, error) { - s := newSession(conn, acceptDatagrams, qlogger) +func NewSession(ctx context.Context, conn Connection, acceptDatagrams bool, qlogger *qlog.Logger) (*Session, error) { + s := newSession(ctx, conn, acceptDatagrams, qlogger) s.start() return s, nil @@ -63,14 +63,14 @@ func NewSession(conn Connection, acceptDatagrams bool, qlogger *qlog.Logger) (*S // NewSessionWithAppHandeledConn creates a new roq session. QUIC connection is handled by application. // HandleDatagram and HandleUniStreamWithFlowID have to be called for each datagram / new stream. -func NewSessionWithAppHandeledConn(conn Connection, acceptDatagrams bool, qlogger *qlog.Logger) (*Session, error) { - s := newSession(conn, acceptDatagrams, qlogger) +func NewSessionWithAppHandeledConn(ctx context.Context, conn Connection, acceptDatagrams bool, qlogger *qlog.Logger) (*Session, error) { + s := newSession(ctx, conn, acceptDatagrams, qlogger) return s, nil } -func newSession(conn Connection, acceptDatagrams bool, qlogger *qlog.Logger) *Session { - ctx, cancel := context.WithCancel(context.Background()) +func newSession(ctx context.Context, conn Connection, acceptDatagrams bool, qlogger *qlog.Logger) *Session { + ctx, cancel := context.WithCancel(ctx) return &Session{ receiveBufferSize: defaultReceiveBufferSize, acceptDatagrams: acceptDatagrams, @@ -133,7 +133,7 @@ func (s *Session) NewReceiveFlow(id uint64) (*ReceiveFlow, error) { var f *ReceiveFlow f = s.receiveFlowBuffer.pop(id) if f == nil { - f = newReceiveFlow(id, s.receiveBufferSize, s.qlog) + f = newReceiveFlow(s.ctx, id, s.receiveBufferSize, s.qlog) } if err := s.receiveFlows.add(id, f); err != nil { return nil, err @@ -228,7 +228,7 @@ func (s *Session) HandleDatagram(datagram []byte) { } f := s.receiveFlowBuffer.get(flowID) if f == nil { - f = newReceiveFlow(flowID, s.receiveBufferSize, s.qlog) + f = newReceiveFlow(s.ctx, flowID, s.receiveBufferSize, s.qlog) s.receiveFlowBuffer.add(f) } b := f.bufferPool.Get().(*bytes.Buffer) @@ -253,7 +253,7 @@ func (s *Session) HandleUniStreamWithFlowID(flowID uint64, rs ReceiveStream) { } f := s.receiveFlowBuffer.get(flowID) if f == nil { - f = newReceiveFlow(flowID, s.receiveBufferSize, s.qlog) + f = newReceiveFlow(s.ctx, flowID, s.receiveBufferSize, s.qlog) s.receiveFlowBuffer.add(f) } f.readStream(rs)