Skip to content

Commit

Permalink
feat: minimal write coalescing
Browse files Browse the repository at this point in the history
This won't coalesce with a delay, but will coalesce writes from multiple
streams and/or pings if they're available at the same time.
  • Loading branch information
Stebalien committed Jan 6, 2022
1 parent 6c2bbc3 commit a9143d7
Showing 1 changed file with 41 additions and 56 deletions.
97 changes: 41 additions & 56 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,25 +472,7 @@ func (s *Session) sendLoop() error {
return nil
}

writer := s.conn

// FIXME: https://github.com/libp2p/go-libp2p/issues/644
// Write coalescing is disabled for now.

//writer := pool.Writer{W: s.conn}

//var writeTimeout *time.Timer
//var writeTimeoutCh <-chan time.Time
//if s.config.WriteCoalesceDelay > 0 {
// writeTimeout = time.NewTimer(s.config.WriteCoalesceDelay)
// defer writeTimeout.Stop()

// writeTimeoutCh = writeTimeout.C
//} else {
// ch := make(chan time.Time)
// close(ch)
// writeTimeoutCh = ch
//}
writer := pool.Writer{W: s.conn}

for {
// yield after processing the last message, if we've shutdown.
Expand All @@ -502,56 +484,59 @@ func (s *Session) sendLoop() error {
}

var buf []byte
// Make sure to send any pings & pongs first so they don't get stuck behind writes.

// Write any pings if waiting.
select {
case pingID := <-s.pingCh:
buf = pool.Get(headerSize)
hdr := encode(typePing, flagSYN, 0, pingID)
copy(buf, hdr[:])
goto write
case pingID := <-s.pongCh:
buf = pool.Get(headerSize)
hdr := encode(typePing, flagACK, 0, pingID)
copy(buf, hdr[:])
goto write
default:
// Then send normal data.
select {
case buf = <-s.sendCh:
case pingID := <-s.pingCh:
buf = pool.Get(headerSize)
hdr := encode(typePing, flagSYN, 0, pingID)
copy(buf, hdr[:])
case pingID := <-s.pongCh:
buf = pool.Get(headerSize)
hdr := encode(typePing, flagACK, 0, pingID)
copy(buf, hdr[:])
case <-s.shutdownCh:
return nil
//default:
// select {
// case buf = <-s.sendCh:
// case <-s.shutdownCh:
// return nil
// case <-writeTimeoutCh:
// if err := writer.Flush(); err != nil {
// if os.IsTimeout(err) {
// err = ErrConnectionWriteTimeout
// }
// return err
// }

// select {
// case buf = <-s.sendCh:
// case <-s.shutdownCh:
// return nil
// }

// if writeTimeout != nil {
// writeTimeout.Reset(s.config.WriteCoalesceDelay)
// }
// }
}

// Then write normal data.
select {
case buf = <-s.sendCh:
goto write
default:
}

// If we get here, nothing is queued. Flush if we have data buffered.
if writer.Buffered() > 0 {
// We need to extend the write deadline because this might trigger a write.
if err := extendWriteDeadline(); err != nil {
return err
}
if err := writer.Flush(); err != nil {
if os.IsTimeout(err) {
err = ErrConnectionWriteTimeout
}
return err
}
}

// Then wait for data on all channels.
select {
case buf = <-s.sendCh:
case pingID := <-s.pingCh:
buf = pool.Get(headerSize)
hdr := encode(typePing, flagSYN, 0, pingID)
copy(buf, hdr[:])
case pingID := <-s.pongCh:
buf = pool.Get(headerSize)
hdr := encode(typePing, flagACK, 0, pingID)
copy(buf, hdr[:])
case <-s.shutdownCh:
return nil
}

write:
if err := extendWriteDeadline(); err != nil {
pool.Put(buf)
return err
Expand Down

0 comments on commit a9143d7

Please sign in to comment.