diff --git a/session.go b/session.go index f7e2453..5d5f6a0 100644 --- a/session.go +++ b/session.go @@ -472,25 +472,7 @@ func (s *Session) sendLoop() error { return nil } - writer := s.conn - - // FIXME: https://github.com/libp2p/go-libp2p/issues/644 - // Write coalescing is disabled for now. - - //writer := pool.Writer{W: s.conn} - - //var writeTimeout *time.Timer - //var writeTimeoutCh <-chan time.Time - //if s.config.WriteCoalesceDelay > 0 { - // writeTimeout = time.NewTimer(s.config.WriteCoalesceDelay) - // defer writeTimeout.Stop() - - // writeTimeoutCh = writeTimeout.C - //} else { - // ch := make(chan time.Time) - // close(ch) - // writeTimeoutCh = ch - //} + writer := pool.Writer{W: s.conn} for { // yield after processing the last message, if we've shutdown. @@ -502,56 +484,59 @@ func (s *Session) sendLoop() error { } var buf []byte - // Make sure to send any pings & pongs first so they don't get stuck behind writes. + + // Write any pings if waiting. select { case pingID := <-s.pingCh: buf = pool.Get(headerSize) hdr := encode(typePing, flagSYN, 0, pingID) copy(buf, hdr[:]) + goto write case pingID := <-s.pongCh: buf = pool.Get(headerSize) hdr := encode(typePing, flagACK, 0, pingID) copy(buf, hdr[:]) + goto write default: - // Then send normal data. - select { - case buf = <-s.sendCh: - case pingID := <-s.pingCh: - buf = pool.Get(headerSize) - hdr := encode(typePing, flagSYN, 0, pingID) - copy(buf, hdr[:]) - case pingID := <-s.pongCh: - buf = pool.Get(headerSize) - hdr := encode(typePing, flagACK, 0, pingID) - copy(buf, hdr[:]) - case <-s.shutdownCh: - return nil - //default: - // select { - // case buf = <-s.sendCh: - // case <-s.shutdownCh: - // return nil - // case <-writeTimeoutCh: - // if err := writer.Flush(); err != nil { - // if os.IsTimeout(err) { - // err = ErrConnectionWriteTimeout - // } - // return err - // } - - // select { - // case buf = <-s.sendCh: - // case <-s.shutdownCh: - // return nil - // } - - // if writeTimeout != nil { - // writeTimeout.Reset(s.config.WriteCoalesceDelay) - // } - // } + } + + // Then write normal data. + select { + case buf = <-s.sendCh: + goto write + default: + } + + // If we get here, nothing is queued. Flush if we have data buffered. + if writer.Buffered() > 0 { + // We need to extend the write deadline because this might trigger a write. + if err := extendWriteDeadline(); err != nil { + return err + } + if err := writer.Flush(); err != nil { + if os.IsTimeout(err) { + err = ErrConnectionWriteTimeout + } + return err } } + // Then wait for data on all channels. + select { + case buf = <-s.sendCh: + case pingID := <-s.pingCh: + buf = pool.Get(headerSize) + hdr := encode(typePing, flagSYN, 0, pingID) + copy(buf, hdr[:]) + case pingID := <-s.pongCh: + buf = pool.Get(headerSize) + hdr := encode(typePing, flagACK, 0, pingID) + copy(buf, hdr[:]) + case <-s.shutdownCh: + return nil + } + + write: if err := extendWriteDeadline(); err != nil { pool.Put(buf) return err