Skip to content

Commit 2bcd102

Browse files
committed
changes no tests
1 parent bbd9a13 commit 2bcd102

File tree

4 files changed

+134
-34
lines changed

4 files changed

+134
-34
lines changed

internal/transport/http2_client.go

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ func NewHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
335335
writerDone: make(chan struct{}),
336336
goAway: make(chan struct{}),
337337
keepaliveDone: make(chan struct{}),
338-
framer: newFramer(conn, writeBufSize, readBufSize, opts.SharedWriteBuffer, maxHeaderListSize),
338+
framer: newFramer(conn, writeBufSize, readBufSize, opts.SharedWriteBuffer, maxHeaderListSize, opts.BufferPool),
339339
fc: &trInFlow{limit: uint32(icwz)},
340340
scheme: scheme,
341341
activeStreams: make(map[uint32]*ClientStream),
@@ -1177,7 +1177,12 @@ func (t *http2Client) updateFlowControl(n uint32) {
11771177
})
11781178
}
11791179

1180-
func (t *http2Client) handleData(f *http2.DataFrame) {
1180+
func (t *http2Client) handleData(f *parsedDataFrame) {
1181+
defer func() {
1182+
if f.data != nil {
1183+
f.data.Free()
1184+
}
1185+
}()
11811186
size := f.Header().Length
11821187
var sendBDPPing bool
11831188
if t.bdpEst != nil {
@@ -1221,22 +1226,15 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
12211226
t.closeStream(s, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, err.Error()), nil, false)
12221227
return
12231228
}
1229+
dataLen := f.data.Len()
12241230
if f.Header().Flags.Has(http2.FlagDataPadded) {
1225-
if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
1231+
if w := s.fc.onRead(size - uint32(dataLen)); w > 0 {
12261232
t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
12271233
}
12281234
}
1229-
// TODO(bradfitz, zhaoq): A copy is required here because there is no
1230-
// guarantee f.Data() is consumed before the arrival of next frame.
1231-
// Can this copy be eliminated?
1232-
if len(f.Data()) > 0 {
1233-
pool := t.bufferPool
1234-
if pool == nil {
1235-
// Note that this is only supposed to be nil in tests. Otherwise, stream is
1236-
// always initialized with a BufferPool.
1237-
pool = mem.DefaultBufferPool()
1238-
}
1239-
s.write(recvMsg{buffer: mem.Copy(f.Data(), pool)})
1235+
if dataLen > 0 {
1236+
s.write(recvMsg{buffer: f.data})
1237+
f.data = nil
12401238
}
12411239
}
12421240
// The server has closed the stream without sending trailers. Record that
@@ -1656,6 +1654,13 @@ func (t *http2Client) reader(errCh chan<- error) {
16561654
}
16571655
}()
16581656

1657+
pool := t.bufferPool
1658+
if pool == nil {
1659+
// Note that this is only supposed to be nil in tests. Otherwise, stream
1660+
// is always initialized with a BufferPool.
1661+
pool = mem.DefaultBufferPool()
1662+
}
1663+
16591664
if err := t.readServerPreface(); err != nil {
16601665
errCh <- err
16611666
return
@@ -1668,7 +1673,7 @@ func (t *http2Client) reader(errCh chan<- error) {
16681673
// loop to keep reading incoming messages on this transport.
16691674
for {
16701675
t.controlBuf.throttle()
1671-
frame, err := t.framer.fr.ReadFrame()
1676+
frame, err := t.framer.readFrame()
16721677
if t.keepaliveEnabled {
16731678
atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
16741679
}
@@ -1701,7 +1706,7 @@ func (t *http2Client) reader(errCh chan<- error) {
17011706
switch frame := frame.(type) {
17021707
case *http2.MetaHeadersFrame:
17031708
t.operateHeaders(frame)
1704-
case *http2.DataFrame:
1709+
case *parsedDataFrame:
17051710
t.handleData(frame)
17061711
case *http2.RSTStreamFrame:
17071712
t.handleRSTStream(frame)

internal/transport/http2_server.go

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,8 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
169169
if config.MaxHeaderListSize != nil {
170170
maxHeaderListSize = *config.MaxHeaderListSize
171171
}
172-
framer := newFramer(conn, writeBufSize, readBufSize, config.SharedWriteBuffer, maxHeaderListSize)
172+
173+
framer := newFramer(conn, writeBufSize, readBufSize, config.SharedWriteBuffer, maxHeaderListSize, config.BufferPool)
173174
// Send initial settings as connection preface to client.
174175
isettings := []http2.Setting{{
175176
ID: http2.SettingMaxFrameSize,
@@ -668,10 +669,16 @@ func (t *http2Server) HandleStreams(ctx context.Context, handle func(*ServerStre
668669
close(t.readerDone)
669670
<-t.loopyWriterDone
670671
}()
672+
pool := t.bufferPool
673+
if pool == nil {
674+
// Note that this is only supposed to be nil in tests. Otherwise, stream
675+
// is always initialized with a BufferPool.
676+
pool = mem.DefaultBufferPool()
677+
}
671678
for {
672679
t.controlBuf.throttle()
673-
frame, err := t.framer.fr.ReadFrame()
674680
atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
681+
frame, err := t.framer.readFrame()
675682
if err != nil {
676683
if se, ok := err.(http2.StreamError); ok {
677684
if t.logger.V(logLevel) {
@@ -707,7 +714,7 @@ func (t *http2Server) HandleStreams(ctx context.Context, handle func(*ServerStre
707714
})
708715
continue
709716
}
710-
case *http2.DataFrame:
717+
case *parsedDataFrame:
711718
t.handleData(frame)
712719
case *http2.RSTStreamFrame:
713720
t.handleRSTStream(frame)
@@ -788,7 +795,12 @@ func (t *http2Server) updateFlowControl(n uint32) {
788795

789796
}
790797

791-
func (t *http2Server) handleData(f *http2.DataFrame) {
798+
func (t *http2Server) handleData(f *parsedDataFrame) {
799+
defer func() {
800+
if f.data != nil {
801+
f.data.Free()
802+
}
803+
}()
792804
size := f.Header().Length
793805
var sendBDPPing bool
794806
if t.bdpEst != nil {
@@ -833,22 +845,15 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
833845
t.closeStream(s, true, http2.ErrCodeFlowControl, false)
834846
return
835847
}
848+
dataLen := f.data.Len()
836849
if f.Header().Flags.Has(http2.FlagDataPadded) {
837-
if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
850+
if w := s.fc.onRead(size - uint32(dataLen)); w > 0 {
838851
t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
839852
}
840853
}
841-
// TODO(bradfitz, zhaoq): A copy is required here because there is no
842-
// guarantee f.Data() is consumed before the arrival of next frame.
843-
// Can this copy be eliminated?
844-
if len(f.Data()) > 0 {
845-
pool := t.bufferPool
846-
if pool == nil {
847-
// Note that this is only supposed to be nil in tests. Otherwise, stream is
848-
// always initialized with a BufferPool.
849-
pool = mem.DefaultBufferPool()
850-
}
851-
s.write(recvMsg{buffer: mem.Copy(f.Data(), pool)})
854+
if dataLen > 0 {
855+
s.write(recvMsg{buffer: f.data})
856+
f.data = nil
852857
}
853858
}
854859
if f.StreamEnded() {

internal/transport/http_util.go

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"golang.org/x/net/http2"
3838
"golang.org/x/net/http2/hpack"
3939
"google.golang.org/grpc/codes"
40+
"google.golang.org/grpc/mem"
4041
)
4142

4243
const (
@@ -388,15 +389,34 @@ func toIOError(err error) error {
388389
return ioError{error: err}
389390
}
390391

392+
type parsedDataFrame struct {
393+
http2.FrameHeader
394+
data mem.Buffer
395+
}
396+
397+
func (df *parsedDataFrame) StreamEnded() bool {
398+
return df.FrameHeader.Flags.Has(http2.FlagDataEndStream)
399+
}
400+
391401
type framer struct {
392402
writer *bufWriter
393403
fr *http2.Framer
404+
reader io.Reader
405+
// Cached data frame to avoid heap allocations.
406+
dataFrame parsedDataFrame
407+
pool mem.BufferPool
394408
}
395409

396410
var writeBufferPoolMap = make(map[int]*sync.Pool)
397411
var writeBufferMutex sync.Mutex
398412

399-
func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, sharedWriteBuffer bool, maxHeaderListSize uint32) *framer {
413+
func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, sharedWriteBuffer bool, maxHeaderListSize uint32, memPool mem.BufferPool) *framer {
414+
if memPool == nil {
415+
// Note that this is only supposed to be nil in tests. Otherwise, stream
416+
// is always initialized with a BufferPool.
417+
memPool = mem.DefaultBufferPool()
418+
}
419+
400420
if writeBufferSize < 0 {
401421
writeBufferSize = 0
402422
}
@@ -412,6 +432,8 @@ func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, sharedWriteBu
412432
f := &framer{
413433
writer: w,
414434
fr: http2.NewFramer(w, r),
435+
reader: r,
436+
pool: memPool,
415437
}
416438
f.fr.SetMaxReadFrameSize(http2MaxFrameLen)
417439
// Opt-in to Frame reuse API on framer to reduce garbage.
@@ -422,6 +444,73 @@ func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, sharedWriteBu
422444
return f
423445
}
424446

447+
func (f *framer) readFrame() (any, error) {
448+
fh, err := f.fr.ReadFrameHeader()
449+
if err != nil {
450+
return nil, err
451+
}
452+
if fh.Type == http2.FrameData {
453+
err = f.readDataFrame(fh, f.pool, &f.dataFrame)
454+
return &f.dataFrame, err
455+
} else {
456+
return f.fr.ReadFrameForHeader(fh)
457+
}
458+
}
459+
460+
// readDataFrame reads and parses a data frame from the underlying io.Reader.
461+
// Frames aren't safe to read from after a subsequent call to ReadFrame.
462+
func (f *framer) readDataFrame(fh http2.FrameHeader, pool mem.BufferPool, df *parsedDataFrame) (err error) {
463+
if fh.StreamID == 0 {
464+
// DATA frames MUST be associated with a stream. If a
465+
// DATA frame is received whose stream identifier
466+
// field is 0x0, the recipient MUST respond with a
467+
// connection error (Section 5.4.1) of type
468+
// PROTOCOL_ERROR.
469+
return fmt.Errorf("DATA frame with stream ID 0")
470+
}
471+
payload := pool.Get(int(fh.Length))
472+
defer func() {
473+
if err != nil {
474+
f.pool.Put(payload)
475+
}
476+
}()
477+
if fh.Flags.Has(http2.FlagDataPadded) {
478+
if fh.Length == 0 {
479+
return io.ErrUnexpectedEOF
480+
}
481+
// This initial 1-byte read can be inefficient for unbuffered readers,
482+
// but it allows the rest of the payload to be read directly to the
483+
// start of the destination slice. This makes it easy to return the
484+
// original slice back to the buffer pool.
485+
if _, err := io.ReadFull(f.reader, (*payload)[:1]); err != nil {
486+
return err
487+
}
488+
padSize := (*payload)[0]
489+
*payload = (*payload)[:len(*payload)-1]
490+
if _, err := io.ReadFull(f.reader, *payload); err != nil {
491+
return err
492+
}
493+
if int(padSize) > len(*payload) {
494+
// If the length of the padding is greater than the
495+
// length of the frame payload, the recipient MUST
496+
// treat this as a connection error.
497+
// Filed: https://github.com/http2/http2-spec/issues/610
498+
return fmt.Errorf("pad size larger than data payload")
499+
}
500+
*payload = (*payload)[:len(*payload)-int(padSize)]
501+
} else if _, err := io.ReadFull(f.reader, *payload); err != nil {
502+
return err
503+
}
504+
505+
df.FrameHeader = fh
506+
df.data = mem.NewBuffer(payload, pool)
507+
return nil
508+
}
509+
510+
func (df *parsedDataFrame) Header() http2.FrameHeader {
511+
return df.FrameHeader
512+
}
513+
425514
func getWriteBufferPool(size int) *sync.Pool {
426515
writeBufferMutex.Lock()
427516
defer writeBufferMutex.Unlock()

internal/transport/keepalive_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
"google.golang.org/grpc/internal/grpctest"
4141
"google.golang.org/grpc/internal/syscall"
4242
"google.golang.org/grpc/keepalive"
43+
"google.golang.org/grpc/mem"
4344
"google.golang.org/grpc/testdata"
4445
)
4546

@@ -192,7 +193,7 @@ func (s) TestKeepaliveServerClosesUnresponsiveClient(t *testing.T) {
192193
if n, err := conn.Write(clientPreface); err != nil || n != len(clientPreface) {
193194
t.Fatalf("conn.Write(clientPreface) failed: n=%v, err=%v", n, err)
194195
}
195-
framer := newFramer(conn, defaultWriteBufSize, defaultReadBufSize, false, 0)
196+
framer := newFramer(conn, defaultWriteBufSize, defaultReadBufSize, false, 0, mem.DefaultBufferPool())
196197
if err := framer.fr.WriteSettings(http2.Setting{}); err != nil {
197198
t.Fatal("framer.WriteSettings(http2.Setting{}) failed:", err)
198199
}

0 commit comments

Comments
 (0)