Skip to content

Commit 4288cfc

Browse files
authored
Cherrypick #8657 and #8667 to v1.77.x (#8690)
Original PRs: #8657, #8667 RELEASE NOTES: * transport: Avoid copies when reading and writing Data frames.
1 parent f959da6 commit 4288cfc

File tree

25 files changed

+622
-86
lines changed

25 files changed

+622
-86
lines changed

examples/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ require (
7474
go.opentelemetry.io/otel/trace v1.38.0 // indirect
7575
go.yaml.in/yaml/v2 v2.4.3 // indirect
7676
golang.org/x/crypto v0.43.0 // indirect
77-
golang.org/x/net v0.46.0 // indirect
77+
golang.org/x/net v0.46.1-0.20251013234738-63d1a5100f82 // indirect
7878
golang.org/x/sync v0.17.0 // indirect
7979
golang.org/x/sys v0.37.0 // indirect
8080
golang.org/x/text v0.30.0 // indirect

examples/go.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4125,8 +4125,9 @@ golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8=
41254125
golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg=
41264126
golang.org/x/net v0.44.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY=
41274127
golang.org/x/net v0.45.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY=
4128-
golang.org/x/net v0.46.0 h1:giFlY12I07fugqwPuWJi68oOnpfqFnJIJzaIIm2JVV4=
41294128
golang.org/x/net v0.46.0/go.mod h1:Q9BGdFy1y4nkUwiLvT5qtyhAnEHgnQ/zd8PfU6nc210=
4129+
golang.org/x/net v0.46.1-0.20251013234738-63d1a5100f82 h1:6/3JGEh1C88g7m+qzzTbl3A0FtsLguXieqofVLU/JAo=
4130+
golang.org/x/net v0.46.1-0.20251013234738-63d1a5100f82/go.mod h1:Q9BGdFy1y4nkUwiLvT5qtyhAnEHgnQ/zd8PfU6nc210=
41304131
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
41314132
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
41324133
golang.org/x/oauth2 v0.0.0-20191202225959-858c2ad4c8b6/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=

gcp/observability/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ require (
5151
go.opentelemetry.io/otel/metric v1.38.0 // indirect
5252
go.opentelemetry.io/otel/trace v1.38.0 // indirect
5353
golang.org/x/crypto v0.43.0 // indirect
54-
golang.org/x/net v0.46.0 // indirect
54+
golang.org/x/net v0.46.1-0.20251013234738-63d1a5100f82 // indirect
5555
golang.org/x/sync v0.17.0 // indirect
5656
golang.org/x/sys v0.37.0 // indirect
5757
golang.org/x/text v0.30.0 // indirect

gcp/observability/go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3813,8 +3813,8 @@ golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8=
38133813
golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg=
38143814
golang.org/x/net v0.44.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY=
38153815
golang.org/x/net v0.45.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY=
3816-
golang.org/x/net v0.46.0 h1:giFlY12I07fugqwPuWJi68oOnpfqFnJIJzaIIm2JVV4=
3817-
golang.org/x/net v0.46.0/go.mod h1:Q9BGdFy1y4nkUwiLvT5qtyhAnEHgnQ/zd8PfU6nc210=
3816+
golang.org/x/net v0.46.1-0.20251013234738-63d1a5100f82 h1:6/3JGEh1C88g7m+qzzTbl3A0FtsLguXieqofVLU/JAo=
3817+
golang.org/x/net v0.46.1-0.20251013234738-63d1a5100f82/go.mod h1:Q9BGdFy1y4nkUwiLvT5qtyhAnEHgnQ/zd8PfU6nc210=
38183818
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
38193819
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
38203820
golang.org/x/oauth2 v0.0.0-20191202225959-858c2ad4c8b6/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ require (
1818
go.opentelemetry.io/otel/sdk v1.38.0
1919
go.opentelemetry.io/otel/sdk/metric v1.38.0
2020
go.opentelemetry.io/otel/trace v1.38.0
21-
golang.org/x/net v0.46.0
21+
golang.org/x/net v0.46.1-0.20251013234738-63d1a5100f82
2222
golang.org/x/oauth2 v0.32.0
2323
golang.org/x/sync v0.17.0
2424
golang.org/x/sys v0.37.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,8 @@ go.opentelemetry.io/otel/trace v1.38.0 h1:Fxk5bKrDZJUH+AMyyIXGcFAPah0oRcT+LuNtJr
5757
go.opentelemetry.io/otel/trace v1.38.0/go.mod h1:j1P9ivuFsTceSWe1oY+EeW3sc+Pp42sO++GHkg4wwhs=
5858
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
5959
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
60-
golang.org/x/net v0.46.0 h1:giFlY12I07fugqwPuWJi68oOnpfqFnJIJzaIIm2JVV4=
61-
golang.org/x/net v0.46.0/go.mod h1:Q9BGdFy1y4nkUwiLvT5qtyhAnEHgnQ/zd8PfU6nc210=
60+
golang.org/x/net v0.46.1-0.20251013234738-63d1a5100f82 h1:6/3JGEh1C88g7m+qzzTbl3A0FtsLguXieqofVLU/JAo=
61+
golang.org/x/net v0.46.1-0.20251013234738-63d1a5100f82/go.mod h1:Q9BGdFy1y4nkUwiLvT5qtyhAnEHgnQ/zd8PfU6nc210=
6262
golang.org/x/oauth2 v0.32.0 h1:jsCblLleRMDrxMN29H3z/k1KliIvpLgCkE6R8FXXNgY=
6363
golang.org/x/oauth2 v0.32.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA=
6464
golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=

internal/transport/controlbuf.go

Lines changed: 36 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -496,6 +496,16 @@ const (
496496
serverSide
497497
)
498498

499+
// maxWriteBufSize is the maximum length (number of elements) the cached
500+
// writeBuf can grow to. The length depends on the number of buffers
501+
// contained within the BufferSlice produced by the codec, which is
502+
// generally small.
503+
//
504+
// If a writeBuf larger than this limit is required, it will be allocated
505+
// and freed after use, rather than being cached. This avoids holding
506+
// on to large amounts of memory.
507+
const maxWriteBufSize = 64
508+
499509
// Loopy receives frames from the control buffer.
500510
// Each frame is handled individually; most of the work done by loopy goes
501511
// into handling data frames. Loopy maintains a queue of active streams, and each
@@ -530,6 +540,8 @@ type loopyWriter struct {
530540

531541
// Side-specific handlers
532542
ssGoAwayHandler func(*goAway) (bool, error)
543+
544+
writeBuf [][]byte // cached slice to avoid heap allocations for calls to mem.Reader.Peek.
533545
}
534546

535547
func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn, logger *grpclog.PrefixLogger, goAwayHandler func(*goAway) (bool, error), bufferPool mem.BufferPool) *loopyWriter {
@@ -962,11 +974,11 @@ func (l *loopyWriter) processData() (bool, error) {
962974

963975
if len(dataItem.h) == 0 && reader.Remaining() == 0 { // Empty data frame
964976
// Client sends out empty data frame with endStream = true
965-
if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
977+
if err := l.framer.writeData(dataItem.streamID, dataItem.endStream, nil); err != nil {
966978
return false, err
967979
}
968980
str.itl.dequeue() // remove the empty data item from stream
969-
_ = reader.Close()
981+
reader.Close()
970982
if str.itl.isEmpty() {
971983
str.state = empty
972984
} else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
@@ -999,25 +1011,20 @@ func (l *loopyWriter) processData() (bool, error) {
9991011
remainingBytes := len(dataItem.h) + reader.Remaining() - hSize - dSize
10001012
size := hSize + dSize
10011013

1002-
var buf *[]byte
1003-
1004-
if hSize != 0 && dSize == 0 {
1005-
buf = &dataItem.h
1006-
} else {
1007-
// Note: this is only necessary because the http2.Framer does not support
1008-
// partially writing a frame, so the sequence must be materialized into a buffer.
1009-
// TODO: Revisit once https://github.com/golang/go/issues/66655 is addressed.
1010-
pool := l.bufferPool
1011-
if pool == nil {
1012-
// Note that this is only supposed to be nil in tests. Otherwise, stream is
1013-
// always initialized with a BufferPool.
1014-
pool = mem.DefaultBufferPool()
1014+
l.writeBuf = l.writeBuf[:0]
1015+
if hSize > 0 {
1016+
l.writeBuf = append(l.writeBuf, dataItem.h[:hSize])
1017+
}
1018+
if dSize > 0 {
1019+
var err error
1020+
l.writeBuf, err = reader.Peek(dSize, l.writeBuf)
1021+
if err != nil {
1022+
// This must never happen since the reader must have at least dSize
1023+
// bytes.
1024+
// Log an error to fail tests.
1025+
l.logger.Errorf("unexpected error while reading Data frame payload: %v", err)
1026+
return false, err
10151027
}
1016-
buf = pool.Get(size)
1017-
defer pool.Put(buf)
1018-
1019-
copy((*buf)[:hSize], dataItem.h)
1020-
_, _ = reader.Read((*buf)[hSize:])
10211028
}
10221029

10231030
// Now that outgoing flow controls are checked we can replenish str's write quota
@@ -1030,15 +1037,22 @@ func (l *loopyWriter) processData() (bool, error) {
10301037
if dataItem.onEachWrite != nil {
10311038
dataItem.onEachWrite()
10321039
}
1033-
if err := l.framer.fr.WriteData(dataItem.streamID, endStream, (*buf)[:size]); err != nil {
1040+
err := l.framer.writeData(dataItem.streamID, endStream, l.writeBuf)
1041+
reader.Discard(dSize)
1042+
if cap(l.writeBuf) > maxWriteBufSize {
1043+
l.writeBuf = nil
1044+
} else {
1045+
clear(l.writeBuf)
1046+
}
1047+
if err != nil {
10341048
return false, err
10351049
}
10361050
str.bytesOutStanding += size
10371051
l.sendQuota -= uint32(size)
10381052
dataItem.h = dataItem.h[hSize:]
10391053

10401054
if remainingBytes == 0 { // All the data from that message was written out.
1041-
_ = reader.Close()
1055+
reader.Close()
10421056
str.itl.dequeue()
10431057
}
10441058
if str.itl.isEmpty() {

internal/transport/http2_client.go

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ func NewHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
336336
writerDone: make(chan struct{}),
337337
goAway: make(chan struct{}),
338338
keepaliveDone: make(chan struct{}),
339-
framer: newFramer(conn, writeBufSize, readBufSize, opts.SharedWriteBuffer, maxHeaderListSize),
339+
framer: newFramer(conn, writeBufSize, readBufSize, opts.SharedWriteBuffer, maxHeaderListSize, opts.BufferPool),
340340
fc: &trInFlow{limit: uint32(icwz)},
341341
scheme: scheme,
342342
activeStreams: make(map[uint32]*ClientStream),
@@ -1170,7 +1170,7 @@ func (t *http2Client) updateFlowControl(n uint32) {
11701170
})
11711171
}
11721172

1173-
func (t *http2Client) handleData(f *http2.DataFrame) {
1173+
func (t *http2Client) handleData(f *parsedDataFrame) {
11741174
size := f.Header().Length
11751175
var sendBDPPing bool
11761176
if t.bdpEst != nil {
@@ -1214,22 +1214,15 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
12141214
t.closeStream(s, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, err.Error()), nil, false)
12151215
return
12161216
}
1217+
dataLen := f.data.Len()
12171218
if f.Header().Flags.Has(http2.FlagDataPadded) {
1218-
if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
1219+
if w := s.fc.onRead(size - uint32(dataLen)); w > 0 {
12191220
t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
12201221
}
12211222
}
1222-
// TODO(bradfitz, zhaoq): A copy is required here because there is no
1223-
// guarantee f.Data() is consumed before the arrival of next frame.
1224-
// Can this copy be eliminated?
1225-
if len(f.Data()) > 0 {
1226-
pool := t.bufferPool
1227-
if pool == nil {
1228-
// Note that this is only supposed to be nil in tests. Otherwise, stream is
1229-
// always initialized with a BufferPool.
1230-
pool = mem.DefaultBufferPool()
1231-
}
1232-
s.write(recvMsg{buffer: mem.Copy(f.Data(), pool)})
1223+
if dataLen > 0 {
1224+
f.data.Ref()
1225+
s.write(recvMsg{buffer: f.data})
12331226
}
12341227
}
12351228
// The server has closed the stream without sending trailers. Record that
@@ -1659,7 +1652,7 @@ func (t *http2Client) reader(errCh chan<- error) {
16591652
// loop to keep reading incoming messages on this transport.
16601653
for {
16611654
t.controlBuf.throttle()
1662-
frame, err := t.framer.fr.ReadFrame()
1655+
frame, err := t.framer.readFrame()
16631656
if t.keepaliveEnabled {
16641657
atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
16651658
}
@@ -1674,7 +1667,7 @@ func (t *http2Client) reader(errCh chan<- error) {
16741667
if s != nil {
16751668
// use error detail to provide better err message
16761669
code := http2ErrConvTab[se.Code]
1677-
errorDetail := t.framer.fr.ErrorDetail()
1670+
errorDetail := t.framer.errorDetail()
16781671
var msg string
16791672
if errorDetail != nil {
16801673
msg = errorDetail.Error()
@@ -1692,8 +1685,9 @@ func (t *http2Client) reader(errCh chan<- error) {
16921685
switch frame := frame.(type) {
16931686
case *http2.MetaHeadersFrame:
16941687
t.operateHeaders(frame)
1695-
case *http2.DataFrame:
1688+
case *parsedDataFrame:
16961689
t.handleData(frame)
1690+
frame.data.Free()
16971691
case *http2.RSTStreamFrame:
16981692
t.handleRSTStream(frame)
16991693
case *http2.SettingsFrame:

internal/transport/http2_server.go

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
169169
if config.MaxHeaderListSize != nil {
170170
maxHeaderListSize = *config.MaxHeaderListSize
171171
}
172-
framer := newFramer(conn, writeBufSize, readBufSize, config.SharedWriteBuffer, maxHeaderListSize)
172+
framer := newFramer(conn, writeBufSize, readBufSize, config.SharedWriteBuffer, maxHeaderListSize, config.BufferPool)
173173
// Send initial settings as connection preface to client.
174174
isettings := []http2.Setting{{
175175
ID: http2.SettingMaxFrameSize,
@@ -670,7 +670,7 @@ func (t *http2Server) HandleStreams(ctx context.Context, handle func(*ServerStre
670670
}()
671671
for {
672672
t.controlBuf.throttle()
673-
frame, err := t.framer.fr.ReadFrame()
673+
frame, err := t.framer.readFrame()
674674
atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
675675
if err != nil {
676676
if se, ok := err.(http2.StreamError); ok {
@@ -707,8 +707,9 @@ func (t *http2Server) HandleStreams(ctx context.Context, handle func(*ServerStre
707707
})
708708
continue
709709
}
710-
case *http2.DataFrame:
710+
case *parsedDataFrame:
711711
t.handleData(frame)
712+
frame.data.Free()
712713
case *http2.RSTStreamFrame:
713714
t.handleRSTStream(frame)
714715
case *http2.SettingsFrame:
@@ -788,7 +789,7 @@ func (t *http2Server) updateFlowControl(n uint32) {
788789

789790
}
790791

791-
func (t *http2Server) handleData(f *http2.DataFrame) {
792+
func (t *http2Server) handleData(f *parsedDataFrame) {
792793
size := f.Header().Length
793794
var sendBDPPing bool
794795
if t.bdpEst != nil {
@@ -833,22 +834,15 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
833834
t.closeStream(s, true, http2.ErrCodeFlowControl, false)
834835
return
835836
}
837+
dataLen := f.data.Len()
836838
if f.Header().Flags.Has(http2.FlagDataPadded) {
837-
if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
839+
if w := s.fc.onRead(size - uint32(dataLen)); w > 0 {
838840
t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
839841
}
840842
}
841-
// TODO(bradfitz, zhaoq): A copy is required here because there is no
842-
// guarantee f.Data() is consumed before the arrival of next frame.
843-
// Can this copy be eliminated?
844-
if len(f.Data()) > 0 {
845-
pool := t.bufferPool
846-
if pool == nil {
847-
// Note that this is only supposed to be nil in tests. Otherwise, stream is
848-
// always initialized with a BufferPool.
849-
pool = mem.DefaultBufferPool()
850-
}
851-
s.write(recvMsg{buffer: mem.Copy(f.Data(), pool)})
843+
if dataLen > 0 {
844+
f.data.Ref()
845+
s.write(recvMsg{buffer: f.data})
852846
}
853847
}
854848
if f.StreamEnded() {

0 commit comments

Comments
 (0)