diff --git a/profiler/compression.go b/profiler/compression.go index 3a4250fbbf..0ca8a19556 100644 --- a/profiler/compression.go +++ b/profiler/compression.go @@ -137,9 +137,29 @@ func getZstdLevelOrDefault(level int) zstd.EncoderLevel { return zstd.SpeedDefault } -// newCompressionPipeline returns a compressor that converts the data written to -// it from the expected input compression to the given output compression. -func newCompressionPipeline(in compression, out compression) (compressor, error) { +type compressionPipelineBuilder struct { + zstdEncoders map[zstd.EncoderLevel]*sharedZstdEncoder +} + +func (b *compressionPipelineBuilder) getZstdEncoder(level zstd.EncoderLevel) (*sharedZstdEncoder, error) { + if b.zstdEncoders == nil { + b.zstdEncoders = make(map[zstd.EncoderLevel]*sharedZstdEncoder) + } + encoder, ok := b.zstdEncoders[level] + if !ok { + var err error + encoder, err = newSharedZstdEncoder(level) + if err != nil { + return nil, err + } + b.zstdEncoders[level] = encoder + } + return encoder, nil +} + +// Build returns a compressor that converts the data written to it from the +// expected input compression to the given output compression. +func (b *compressionPipelineBuilder) Build(in compression, out compression) (compressor, error) { if in == out { return newPassthroughCompressor(), nil } @@ -149,11 +169,15 @@ func newCompressionPipeline(in compression, out compression) (compressor, error) } if in == noCompression && out.algorithm == compressionAlgorithmZstd { - return zstd.NewWriter(nil, zstd.WithEncoderLevel(getZstdLevelOrDefault(out.level))) + return b.getZstdEncoder(getZstdLevelOrDefault(out.level)) } if in.algorithm == compressionAlgorithmGzip && out.algorithm == compressionAlgorithmZstd { - return newZstdRecompressor(getZstdLevelOrDefault(out.level)) + encoder, err := b.getZstdEncoder(getZstdLevelOrDefault(out.level)) + if err != nil { + return nil, err + } + return newZstdRecompressor(encoder), nil } return nil, fmt.Errorf("unsupported recompression: %s -> %s", in, out) @@ -164,8 +188,11 @@ func newCompressionPipeline(in compression, out compression) (compressor, error) // the data from one format and then re-compresses it into another format. type compressor interface { io.Writer - io.Closer + // Reset resets the compressor to the given writer. It may also acquire a + // shared underlying resource, so callers must always call Close(). Reset(w io.Writer) + // Close closes the compressor and releases any shared underlying resource. + Close() error } // newPassthroughCompressor returns a compressor that simply passes all data @@ -186,12 +213,8 @@ func (r *passthroughCompressor) Close() error { return nil } -func newZstdRecompressor(level zstd.EncoderLevel) (*zstdRecompressor, error) { - zstdOut, err := zstd.NewWriter(io.Discard, zstd.WithEncoderLevel(level)) - if err != nil { - return nil, err - } - return &zstdRecompressor{zstdOut: zstdOut, err: make(chan error)}, nil +func newZstdRecompressor(encoder *sharedZstdEncoder) *zstdRecompressor { + return &zstdRecompressor{zstdOut: encoder, err: make(chan error)} } type zstdRecompressor struct { @@ -199,8 +222,7 @@ type zstdRecompressor struct { // error during recompression err chan error pw io.WriteCloser - zstdOut *zstd.Encoder - level zstd.EncoderLevel + zstdOut *sharedZstdEncoder } func (r *zstdRecompressor) Reset(w io.Writer) { @@ -227,3 +249,36 @@ func (r *zstdRecompressor) Close() error { err := <-r.err return cmp.Or(err, r.zstdOut.Close()) } + +// newSharedZstdEncoder creates a new shared Zstd encoder with the given level. +// It expects the Reset and Close method to be used in an acquire and release +// fashion. +func newSharedZstdEncoder(level zstd.EncoderLevel) (*sharedZstdEncoder, error) { + encoder, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(level)) + if err != nil { + return nil, err + } + return &sharedZstdEncoder{encoder: encoder, sema: make(chan struct{}, 1)}, nil +} + +type sharedZstdEncoder struct { + encoder *zstd.Encoder + sema chan struct{} +} + +// Reset acquires the semaphore and resets the encoder to the given writer. +func (s *sharedZstdEncoder) Reset(w io.Writer) { + s.sema <- struct{}{} + s.encoder.Reset(w) +} + +func (s *sharedZstdEncoder) Write(p []byte) (int, error) { + return s.encoder.Write(p) +} + +// Close releases the semaphore and closes the encoder. +func (s *sharedZstdEncoder) Close() error { + err := s.encoder.Close() + <-s.sema + return err +} diff --git a/profiler/compression_test.go b/profiler/compression_test.go index c896cdaebf..297b6d3cf9 100644 --- a/profiler/compression_test.go +++ b/profiler/compression_test.go @@ -43,7 +43,8 @@ func TestNewCompressionPipeline(t *testing.T) { for _, test := range tests { t.Run(fmt.Sprintf("%s->%s", test.in, test.out), func(t *testing.T) { - pipeline, err := newCompressionPipeline(test.in, test.out) + var pipelineBuilder compressionPipelineBuilder + pipeline, err := pipelineBuilder.Build(test.in, test.out) require.NoError(t, err) buf := &bytes.Buffer{} pipeline.Reset(buf) @@ -172,8 +173,13 @@ func BenchmarkRecompression(b *testing.B) { b.Run(fmt.Sprintf("%s-%s", in.inAlg.String(), in.outLevel), func(b *testing.B) { data := compressData(b, inputdata, in.inAlg) b.ResetTimer() + var pipelineBuilder compressionPipelineBuilder for i := 0; i < b.N; i++ { - z := &zstdRecompressor{level: in.outLevel} + encoder, err := pipelineBuilder.getZstdEncoder(in.outLevel) + if err != nil { + b.Fatal(err) + } + z := newZstdRecompressor(encoder) z.Reset(io.Discard) if _, err := z.Write(data); err != nil { b.Fatal(err) diff --git a/profiler/profile.go b/profiler/profile.go index 660562381a..bef7ca84a5 100644 --- a/profiler/profile.go +++ b/profiler/profile.go @@ -89,6 +89,7 @@ var profileTypes = map[ProfileType]profileType{ Filename: "cpu.pprof", Collect: func(p *profiler) ([]byte, error) { var buf bytes.Buffer + var outBuf bytes.Buffer // Start the CPU profiler at the end of the profiling // period so that we're sure to capture the CPU usage of // this library, which mostly happens at the end @@ -101,9 +102,7 @@ var profileTypes = map[ProfileType]profileType{ runtime.SetCPUProfileRate(p.cfg.cpuProfileRate) } - compressor := p.compressors[CPUProfile] - compressor.Reset(&buf) - if err := p.startCPUProfile(compressor); err != nil { + if err := p.startCPUProfile(&outBuf); err != nil { return nil, err } p.interruptibleSleep(p.cfg.cpuDuration) @@ -113,10 +112,12 @@ var profileTypes = map[ProfileType]profileType{ // the other profile types p.pendingProfiles.Wait() p.stopCPUProfile() - if err := compressor.Close(); err != nil { - return nil, err - } - return buf.Bytes(), nil + + c := p.compressors[CPUProfile] + c.Reset(&buf) + _, writeErr := outBuf.WriteTo(c) + closeErr := c.Close() + return buf.Bytes(), cmp.Or(writeErr, closeErr) }, }, // HeapProfile is complex due to how the Go runtime exposes it. It contains 4 @@ -175,10 +176,10 @@ var profileTypes = map[ProfileType]profileType{ return nil, err } - compressor := p.compressors[expGoroutineWaitProfile] - compressor.Reset(pprof) - err := goroutineDebug2ToPprof(text, compressor, now) - err = cmp.Or(err, compressor.Close()) + c := p.compressors[expGoroutineWaitProfile] + c.Reset(pprof) + err := goroutineDebug2ToPprof(text, c, now) + err = cmp.Or(err, c.Close()) return pprof.Bytes(), err }, }, @@ -187,11 +188,11 @@ var profileTypes = map[ProfileType]profileType{ Filename: "metrics.json", Collect: func(p *profiler) ([]byte, error) { var buf bytes.Buffer - compressor := p.compressors[MetricsProfile] - compressor.Reset(&buf) + c := p.compressors[MetricsProfile] + c.Reset(&buf) interrupted := p.interruptibleSleep(p.cfg.period) - err := p.met.report(now(), compressor) - err = cmp.Or(err, compressor.Close()) + err := p.met.report(now(), c) + err = cmp.Or(err, c.Close()) if err != nil && interrupted { err = errProfilerStopped } @@ -204,9 +205,8 @@ var profileTypes = map[ProfileType]profileType{ Collect: func(p *profiler) ([]byte, error) { p.lastTrace = time.Now() buf := new(bytes.Buffer) - compressor := p.compressors[executionTrace] - compressor.Reset(buf) - lt := newLimitedTraceCollector(compressor, int64(p.cfg.traceConfig.Limit)) + outBuf := new(bytes.Buffer) + lt := newLimitedTraceCollector(outBuf, int64(p.cfg.traceConfig.Limit)) if err := trace.Start(lt); err != nil { return nil, err } @@ -217,10 +217,12 @@ var profileTypes = map[ProfileType]profileType{ case <-lt.done: // The trace size limit was exceeded } trace.Stop() - if err := compressor.Close(); err != nil { - return nil, err - } - return buf.Bytes(), nil + + c := p.compressors[executionTrace] + c.Reset(buf) + _, writeErr := outBuf.WriteTo(c) + closeErr := c.Close() + return buf.Bytes(), cmp.Or(writeErr, closeErr) }, }, } @@ -284,10 +286,10 @@ func collectGenericProfile(name string, pt ProfileType) func(p *profiler) ([]byt var buf bytes.Buffer dp, ok := p.deltas[pt] if !ok || !p.cfg.deltaProfiles { - compressor := p.compressors[pt] - compressor.Reset(&buf) - err := p.lookupProfile(name, compressor, 0) - err = cmp.Or(err, compressor.Close()) + c := p.compressors[pt] + c.Reset(&buf) + err := p.lookupProfile(name, c, 0) + err = cmp.Or(err, c.Close()) return buf.Bytes(), err } @@ -435,13 +437,15 @@ func (fdp *fastDeltaProfiler) Delta(data []byte) (b []byte, err error) { } fdp.buf.Reset() - fdp.compressor.Reset(&fdp.buf) - - if err = fdp.dc.Delta(data, fdp.compressor); err != nil { - return nil, fmt.Errorf("error computing delta: %s", err.Error()) - } - if err = fdp.compressor.Close(); err != nil { - return nil, fmt.Errorf("error flushing gzip writer: %s", err.Error()) + c := fdp.compressor + c.Reset(&fdp.buf) + + deltaErr := fdp.dc.Delta(data, c) + closeErr := c.Close() + if deltaErr != nil { + return nil, fmt.Errorf("error computing delta: %w", deltaErr) + } else if closeErr != nil { + return nil, fmt.Errorf("error flushing compressor: %w", closeErr) } // The returned slice will be retained in case the profile upload fails, // so we need to return a copy of the buffer's bytes to avoid a data diff --git a/profiler/profiler.go b/profiler/profiler.go index c5b45e13f9..4490612081 100644 --- a/profiler/profiler.go +++ b/profiler/profiler.go @@ -259,10 +259,11 @@ func newProfiler(opts ...Option) (*profiler, error) { if p.cfg.traceConfig.Enabled { types = append(types, executionTrace) } + var pipelineBuilder compressionPipelineBuilder for _, pt := range types { isDelta := p.cfg.deltaProfiles && len(profileTypes[pt].DeltaValues) > 0 in, out := compressionStrategy(pt, isDelta, p.cfg.compressionConfig) - compressor, err := newCompressionPipeline(in, out) + compressor, err := pipelineBuilder.Build(in, out) if err != nil { return nil, err }