diff --git a/pkg/blobstore/grpcservers/byte_stream_server.go b/pkg/blobstore/grpcservers/byte_stream_server.go index 8e377bb7..4d0df86c 100644 --- a/pkg/blobstore/grpcservers/byte_stream_server.go +++ b/pkg/blobstore/grpcservers/byte_stream_server.go @@ -222,7 +222,6 @@ func (s *byteStreamServer) writeZstd(stream bytestream.ByteStream_WriteServer, r if err != nil { return status.Errorf(codes.ResourceExhausted, "Failed to acquire ZSTD decoder: %v", err) } - defer zstdReader.Close() if err := s.blobAccess.Put( ctx, diff --git a/pkg/zstd/metrics_pool.go b/pkg/zstd/metrics_pool.go index 08e204f5..893fc316 100644 --- a/pkg/zstd/metrics_pool.go +++ b/pkg/zstd/metrics_pool.go @@ -123,9 +123,14 @@ func (p *metricsPool) NewDecoder(ctx context.Context, r io.Reader) (Decoder, err type metricsEncoder struct { Encoder releases prometheus.Counter + closed bool } func (e *metricsEncoder) Close() error { + if e.closed { + return nil + } + e.closed = true err := e.Encoder.Close() e.releases.Inc() return err @@ -134,9 +139,14 @@ func (e *metricsEncoder) Close() error { type metricsDecoder struct { Decoder releases prometheus.Counter + closed bool } func (d *metricsDecoder) Close() { + if d.closed { + return + } + d.closed = true d.Decoder.Close() d.releases.Inc() }