From a6be2796ab91f7d1e77ea89bb9f883e3081564a7 Mon Sep 17 00:00:00 2001 From: Nick Ripley Date: Tue, 21 Oct 2025 16:39:52 -0400 Subject: [PATCH 1/2] fix(profiler): reduce memory usage for compression The zstd compression library uses ~8MiB per compressor by default, primarily for the back-reference window. See this parameter: https://pkg.go.dev/github.com/klauspost/compress/zstd#WithWindowSize Since we have an encoder per profile type, this leads to a noticable increase in memory usage after switching to zstd by default. We can make the window smaller, but this can negatively affect the compression ratio. Instead, we can just use a single encoder and share it between the profile types. This commit does the bare minimum to implement a single encoder. It's a bit kludgy to use a separate global lock to guard access to the encoder. But it's awkward to plumb the synchronization around and keep it more encapsulated without a bigger refactor. --- profiler/compression.go | 38 +++++++++++++++++++++-- profiler/profile.go | 69 +++++++++++++++++++++++++++-------------- 2 files changed, 82 insertions(+), 25 deletions(-) diff --git a/profiler/compression.go b/profiler/compression.go index 3a4250fbbf..1d897cdfab 100644 --- a/profiler/compression.go +++ b/profiler/compression.go @@ -32,6 +32,7 @@ import ( "io" "strconv" "strings" + "sync" kgzip "github.com/klauspost/compress/gzip" "github.com/klauspost/compress/zstd" @@ -137,6 +138,39 @@ func getZstdLevelOrDefault(level int) zstd.EncoderLevel { return zstd.SpeedDefault } +type sema struct { + c chan struct{} +} + +func (s *sema) Lock() { s.c <- struct{}{} } +func (s *sema) Unlock() { <-s.c } + +var ( + // compressionMux protects zstdEncoder. It must be locked + // when doing compression that _might_ use zstdEncoder. + // + // It's a channel-based semaphore rather than a mutex + // so that contention on it doesn't appear in the mutex + // profile (we expect it to be contended). + // This is a kludge. We only really want it for zstdEncoder, + // but the places where we actually do compression just + // take a compressor interface. It's easier for now to just + // have this global semaphore than to plumb the locking + // through the existing abstractions. + compressionMux = &sema{c: make(chan struct{}, 1)} + + zstdEncoderOnce sync.Once + zstdEncoder *zstd.Encoder + zstdEncoderErr error +) + +func getZstdEncoder(opts ...zstd.EOption) (*zstd.Encoder, error) { + zstdEncoderOnce.Do(func() { + zstdEncoder, zstdEncoderErr = zstd.NewWriter(nil, opts...) + }) + return zstdEncoder, zstdEncoderErr +} + // newCompressionPipeline returns a compressor that converts the data written to // it from the expected input compression to the given output compression. func newCompressionPipeline(in compression, out compression) (compressor, error) { @@ -149,7 +183,7 @@ func newCompressionPipeline(in compression, out compression) (compressor, error) } if in == noCompression && out.algorithm == compressionAlgorithmZstd { - return zstd.NewWriter(nil, zstd.WithEncoderLevel(getZstdLevelOrDefault(out.level))) + return getZstdEncoder(zstd.WithEncoderLevel(getZstdLevelOrDefault(out.level))) } if in.algorithm == compressionAlgorithmGzip && out.algorithm == compressionAlgorithmZstd { @@ -187,7 +221,7 @@ func (r *passthroughCompressor) Close() error { } func newZstdRecompressor(level zstd.EncoderLevel) (*zstdRecompressor, error) { - zstdOut, err := zstd.NewWriter(io.Discard, zstd.WithEncoderLevel(level)) + zstdOut, err := getZstdEncoder(zstd.WithEncoderLevel(level)) if err != nil { return nil, err } diff --git a/profiler/profile.go b/profiler/profile.go index 660562381a..e461e39761 100644 --- a/profiler/profile.go +++ b/profiler/profile.go @@ -89,6 +89,7 @@ var profileTypes = map[ProfileType]profileType{ Filename: "cpu.pprof", Collect: func(p *profiler) ([]byte, error) { var buf bytes.Buffer + var outBuf bytes.Buffer // Start the CPU profiler at the end of the profiling // period so that we're sure to capture the CPU usage of // this library, which mostly happens at the end @@ -101,9 +102,7 @@ var profileTypes = map[ProfileType]profileType{ runtime.SetCPUProfileRate(p.cfg.cpuProfileRate) } - compressor := p.compressors[CPUProfile] - compressor.Reset(&buf) - if err := p.startCPUProfile(compressor); err != nil { + if err := p.startCPUProfile(&outBuf); err != nil { return nil, err } p.interruptibleSleep(p.cfg.cpuDuration) @@ -113,7 +112,15 @@ var profileTypes = map[ProfileType]profileType{ // the other profile types p.pendingProfiles.Wait() p.stopCPUProfile() - if err := compressor.Close(); err != nil { + + c := p.compressors[CPUProfile] + compressionMux.Lock() + defer compressionMux.Unlock() + c.Reset(&buf) + if _, err := outBuf.WriteTo(c); err != nil { + return nil, err + } + if err := c.Close(); err != nil { return nil, err } return buf.Bytes(), nil @@ -175,10 +182,12 @@ var profileTypes = map[ProfileType]profileType{ return nil, err } - compressor := p.compressors[expGoroutineWaitProfile] - compressor.Reset(pprof) - err := goroutineDebug2ToPprof(text, compressor, now) - err = cmp.Or(err, compressor.Close()) + c := p.compressors[expGoroutineWaitProfile] + compressionMux.Lock() + defer compressionMux.Unlock() + c.Reset(pprof) + err := goroutineDebug2ToPprof(text, c, now) + err = cmp.Or(err, c.Close()) return pprof.Bytes(), err }, }, @@ -187,11 +196,13 @@ var profileTypes = map[ProfileType]profileType{ Filename: "metrics.json", Collect: func(p *profiler) ([]byte, error) { var buf bytes.Buffer - compressor := p.compressors[MetricsProfile] - compressor.Reset(&buf) + c := p.compressors[MetricsProfile] + compressionMux.Lock() + defer compressionMux.Unlock() + c.Reset(&buf) interrupted := p.interruptibleSleep(p.cfg.period) - err := p.met.report(now(), compressor) - err = cmp.Or(err, compressor.Close()) + err := p.met.report(now(), c) + err = cmp.Or(err, c.Close()) if err != nil && interrupted { err = errProfilerStopped } @@ -204,9 +215,8 @@ var profileTypes = map[ProfileType]profileType{ Collect: func(p *profiler) ([]byte, error) { p.lastTrace = time.Now() buf := new(bytes.Buffer) - compressor := p.compressors[executionTrace] - compressor.Reset(buf) - lt := newLimitedTraceCollector(compressor, int64(p.cfg.traceConfig.Limit)) + outBuf := new(bytes.Buffer) + lt := newLimitedTraceCollector(outBuf, int64(p.cfg.traceConfig.Limit)) if err := trace.Start(lt); err != nil { return nil, err } @@ -217,7 +227,15 @@ var profileTypes = map[ProfileType]profileType{ case <-lt.done: // The trace size limit was exceeded } trace.Stop() - if err := compressor.Close(); err != nil { + + c := p.compressors[executionTrace] + compressionMux.Lock() + defer compressionMux.Unlock() + c.Reset(buf) + if _, err := outBuf.WriteTo(c); err != nil { + return nil, err + } + if err := c.Close(); err != nil { return nil, err } return buf.Bytes(), nil @@ -284,10 +302,12 @@ func collectGenericProfile(name string, pt ProfileType) func(p *profiler) ([]byt var buf bytes.Buffer dp, ok := p.deltas[pt] if !ok || !p.cfg.deltaProfiles { - compressor := p.compressors[pt] - compressor.Reset(&buf) - err := p.lookupProfile(name, compressor, 0) - err = cmp.Or(err, compressor.Close()) + c := p.compressors[pt] + compressionMux.Lock() + defer compressionMux.Unlock() + c.Reset(&buf) + err := p.lookupProfile(name, c, 0) + err = cmp.Or(err, c.Close()) return buf.Bytes(), err } @@ -435,12 +455,15 @@ func (fdp *fastDeltaProfiler) Delta(data []byte) (b []byte, err error) { } fdp.buf.Reset() - fdp.compressor.Reset(&fdp.buf) + c := fdp.compressor + compressionMux.Lock() + defer compressionMux.Unlock() + c.Reset(&fdp.buf) - if err = fdp.dc.Delta(data, fdp.compressor); err != nil { + if err = fdp.dc.Delta(data, c); err != nil { return nil, fmt.Errorf("error computing delta: %s", err.Error()) } - if err = fdp.compressor.Close(); err != nil { + if err = c.Close(); err != nil { return nil, fmt.Errorf("error flushing gzip writer: %s", err.Error()) } // The returned slice will be retained in case the profile upload fails, From a37f42c9b5641be6940fa83e2de338291db74b44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Tue, 28 Oct 2025 12:57:22 +0100 Subject: [PATCH 2/2] refactor(profiler): alternative zstd encoder reuse approach (#4078) --- profiler/compression.go | 109 +++++++++++++++++++++-------------- profiler/compression_test.go | 10 +++- profiler/profile.go | 43 ++++---------- profiler/profiler.go | 3 +- 4 files changed, 87 insertions(+), 78 deletions(-) diff --git a/profiler/compression.go b/profiler/compression.go index 1d897cdfab..0ca8a19556 100644 --- a/profiler/compression.go +++ b/profiler/compression.go @@ -32,7 +32,6 @@ import ( "io" "strconv" "strings" - "sync" kgzip "github.com/klauspost/compress/gzip" "github.com/klauspost/compress/zstd" @@ -138,42 +137,29 @@ func getZstdLevelOrDefault(level int) zstd.EncoderLevel { return zstd.SpeedDefault } -type sema struct { - c chan struct{} +type compressionPipelineBuilder struct { + zstdEncoders map[zstd.EncoderLevel]*sharedZstdEncoder } -func (s *sema) Lock() { s.c <- struct{}{} } -func (s *sema) Unlock() { <-s.c } - -var ( - // compressionMux protects zstdEncoder. It must be locked - // when doing compression that _might_ use zstdEncoder. - // - // It's a channel-based semaphore rather than a mutex - // so that contention on it doesn't appear in the mutex - // profile (we expect it to be contended). - // This is a kludge. We only really want it for zstdEncoder, - // but the places where we actually do compression just - // take a compressor interface. It's easier for now to just - // have this global semaphore than to plumb the locking - // through the existing abstractions. - compressionMux = &sema{c: make(chan struct{}, 1)} - - zstdEncoderOnce sync.Once - zstdEncoder *zstd.Encoder - zstdEncoderErr error -) - -func getZstdEncoder(opts ...zstd.EOption) (*zstd.Encoder, error) { - zstdEncoderOnce.Do(func() { - zstdEncoder, zstdEncoderErr = zstd.NewWriter(nil, opts...) - }) - return zstdEncoder, zstdEncoderErr +func (b *compressionPipelineBuilder) getZstdEncoder(level zstd.EncoderLevel) (*sharedZstdEncoder, error) { + if b.zstdEncoders == nil { + b.zstdEncoders = make(map[zstd.EncoderLevel]*sharedZstdEncoder) + } + encoder, ok := b.zstdEncoders[level] + if !ok { + var err error + encoder, err = newSharedZstdEncoder(level) + if err != nil { + return nil, err + } + b.zstdEncoders[level] = encoder + } + return encoder, nil } -// newCompressionPipeline returns a compressor that converts the data written to -// it from the expected input compression to the given output compression. -func newCompressionPipeline(in compression, out compression) (compressor, error) { +// Build returns a compressor that converts the data written to it from the +// expected input compression to the given output compression. +func (b *compressionPipelineBuilder) Build(in compression, out compression) (compressor, error) { if in == out { return newPassthroughCompressor(), nil } @@ -183,11 +169,15 @@ func newCompressionPipeline(in compression, out compression) (compressor, error) } if in == noCompression && out.algorithm == compressionAlgorithmZstd { - return getZstdEncoder(zstd.WithEncoderLevel(getZstdLevelOrDefault(out.level))) + return b.getZstdEncoder(getZstdLevelOrDefault(out.level)) } if in.algorithm == compressionAlgorithmGzip && out.algorithm == compressionAlgorithmZstd { - return newZstdRecompressor(getZstdLevelOrDefault(out.level)) + encoder, err := b.getZstdEncoder(getZstdLevelOrDefault(out.level)) + if err != nil { + return nil, err + } + return newZstdRecompressor(encoder), nil } return nil, fmt.Errorf("unsupported recompression: %s -> %s", in, out) @@ -198,8 +188,11 @@ func newCompressionPipeline(in compression, out compression) (compressor, error) // the data from one format and then re-compresses it into another format. type compressor interface { io.Writer - io.Closer + // Reset resets the compressor to the given writer. It may also acquire a + // shared underlying resource, so callers must always call Close(). Reset(w io.Writer) + // Close closes the compressor and releases any shared underlying resource. + Close() error } // newPassthroughCompressor returns a compressor that simply passes all data @@ -220,12 +213,8 @@ func (r *passthroughCompressor) Close() error { return nil } -func newZstdRecompressor(level zstd.EncoderLevel) (*zstdRecompressor, error) { - zstdOut, err := getZstdEncoder(zstd.WithEncoderLevel(level)) - if err != nil { - return nil, err - } - return &zstdRecompressor{zstdOut: zstdOut, err: make(chan error)}, nil +func newZstdRecompressor(encoder *sharedZstdEncoder) *zstdRecompressor { + return &zstdRecompressor{zstdOut: encoder, err: make(chan error)} } type zstdRecompressor struct { @@ -233,8 +222,7 @@ type zstdRecompressor struct { // error during recompression err chan error pw io.WriteCloser - zstdOut *zstd.Encoder - level zstd.EncoderLevel + zstdOut *sharedZstdEncoder } func (r *zstdRecompressor) Reset(w io.Writer) { @@ -261,3 +249,36 @@ func (r *zstdRecompressor) Close() error { err := <-r.err return cmp.Or(err, r.zstdOut.Close()) } + +// newSharedZstdEncoder creates a new shared Zstd encoder with the given level. +// It expects the Reset and Close method to be used in an acquire and release +// fashion. +func newSharedZstdEncoder(level zstd.EncoderLevel) (*sharedZstdEncoder, error) { + encoder, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(level)) + if err != nil { + return nil, err + } + return &sharedZstdEncoder{encoder: encoder, sema: make(chan struct{}, 1)}, nil +} + +type sharedZstdEncoder struct { + encoder *zstd.Encoder + sema chan struct{} +} + +// Reset acquires the semaphore and resets the encoder to the given writer. +func (s *sharedZstdEncoder) Reset(w io.Writer) { + s.sema <- struct{}{} + s.encoder.Reset(w) +} + +func (s *sharedZstdEncoder) Write(p []byte) (int, error) { + return s.encoder.Write(p) +} + +// Close releases the semaphore and closes the encoder. +func (s *sharedZstdEncoder) Close() error { + err := s.encoder.Close() + <-s.sema + return err +} diff --git a/profiler/compression_test.go b/profiler/compression_test.go index c896cdaebf..297b6d3cf9 100644 --- a/profiler/compression_test.go +++ b/profiler/compression_test.go @@ -43,7 +43,8 @@ func TestNewCompressionPipeline(t *testing.T) { for _, test := range tests { t.Run(fmt.Sprintf("%s->%s", test.in, test.out), func(t *testing.T) { - pipeline, err := newCompressionPipeline(test.in, test.out) + var pipelineBuilder compressionPipelineBuilder + pipeline, err := pipelineBuilder.Build(test.in, test.out) require.NoError(t, err) buf := &bytes.Buffer{} pipeline.Reset(buf) @@ -172,8 +173,13 @@ func BenchmarkRecompression(b *testing.B) { b.Run(fmt.Sprintf("%s-%s", in.inAlg.String(), in.outLevel), func(b *testing.B) { data := compressData(b, inputdata, in.inAlg) b.ResetTimer() + var pipelineBuilder compressionPipelineBuilder for i := 0; i < b.N; i++ { - z := &zstdRecompressor{level: in.outLevel} + encoder, err := pipelineBuilder.getZstdEncoder(in.outLevel) + if err != nil { + b.Fatal(err) + } + z := newZstdRecompressor(encoder) z.Reset(io.Discard) if _, err := z.Write(data); err != nil { b.Fatal(err) diff --git a/profiler/profile.go b/profiler/profile.go index e461e39761..bef7ca84a5 100644 --- a/profiler/profile.go +++ b/profiler/profile.go @@ -114,16 +114,10 @@ var profileTypes = map[ProfileType]profileType{ p.stopCPUProfile() c := p.compressors[CPUProfile] - compressionMux.Lock() - defer compressionMux.Unlock() c.Reset(&buf) - if _, err := outBuf.WriteTo(c); err != nil { - return nil, err - } - if err := c.Close(); err != nil { - return nil, err - } - return buf.Bytes(), nil + _, writeErr := outBuf.WriteTo(c) + closeErr := c.Close() + return buf.Bytes(), cmp.Or(writeErr, closeErr) }, }, // HeapProfile is complex due to how the Go runtime exposes it. It contains 4 @@ -183,8 +177,6 @@ var profileTypes = map[ProfileType]profileType{ } c := p.compressors[expGoroutineWaitProfile] - compressionMux.Lock() - defer compressionMux.Unlock() c.Reset(pprof) err := goroutineDebug2ToPprof(text, c, now) err = cmp.Or(err, c.Close()) @@ -197,8 +189,6 @@ var profileTypes = map[ProfileType]profileType{ Collect: func(p *profiler) ([]byte, error) { var buf bytes.Buffer c := p.compressors[MetricsProfile] - compressionMux.Lock() - defer compressionMux.Unlock() c.Reset(&buf) interrupted := p.interruptibleSleep(p.cfg.period) err := p.met.report(now(), c) @@ -229,16 +219,10 @@ var profileTypes = map[ProfileType]profileType{ trace.Stop() c := p.compressors[executionTrace] - compressionMux.Lock() - defer compressionMux.Unlock() c.Reset(buf) - if _, err := outBuf.WriteTo(c); err != nil { - return nil, err - } - if err := c.Close(); err != nil { - return nil, err - } - return buf.Bytes(), nil + _, writeErr := outBuf.WriteTo(c) + closeErr := c.Close() + return buf.Bytes(), cmp.Or(writeErr, closeErr) }, }, } @@ -303,8 +287,6 @@ func collectGenericProfile(name string, pt ProfileType) func(p *profiler) ([]byt dp, ok := p.deltas[pt] if !ok || !p.cfg.deltaProfiles { c := p.compressors[pt] - compressionMux.Lock() - defer compressionMux.Unlock() c.Reset(&buf) err := p.lookupProfile(name, c, 0) err = cmp.Or(err, c.Close()) @@ -456,15 +438,14 @@ func (fdp *fastDeltaProfiler) Delta(data []byte) (b []byte, err error) { fdp.buf.Reset() c := fdp.compressor - compressionMux.Lock() - defer compressionMux.Unlock() c.Reset(&fdp.buf) - if err = fdp.dc.Delta(data, c); err != nil { - return nil, fmt.Errorf("error computing delta: %s", err.Error()) - } - if err = c.Close(); err != nil { - return nil, fmt.Errorf("error flushing gzip writer: %s", err.Error()) + deltaErr := fdp.dc.Delta(data, c) + closeErr := c.Close() + if deltaErr != nil { + return nil, fmt.Errorf("error computing delta: %w", deltaErr) + } else if closeErr != nil { + return nil, fmt.Errorf("error flushing compressor: %w", closeErr) } // The returned slice will be retained in case the profile upload fails, // so we need to return a copy of the buffer's bytes to avoid a data diff --git a/profiler/profiler.go b/profiler/profiler.go index c5b45e13f9..4490612081 100644 --- a/profiler/profiler.go +++ b/profiler/profiler.go @@ -259,10 +259,11 @@ func newProfiler(opts ...Option) (*profiler, error) { if p.cfg.traceConfig.Enabled { types = append(types, executionTrace) } + var pipelineBuilder compressionPipelineBuilder for _, pt := range types { isDelta := p.cfg.deltaProfiles && len(profileTypes[pt].DeltaValues) > 0 in, out := compressionStrategy(pt, isDelta, p.cfg.compressionConfig) - compressor, err := newCompressionPipeline(in, out) + compressor, err := pipelineBuilder.Build(in, out) if err != nil { return nil, err }