Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/bb_copy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ func main() {

blobAccessCreator := blobstore_configuration.NewCASBlobAccessCreator(
grpcClientFactory,
int(configuration.MaximumMessageSizeBytes))
int(configuration.MaximumMessageSizeBytes),
nil)
source, err := blobstore_configuration.NewBlobAccessFromConfiguration(
dependenciesGroup,
configuration.Source,
Expand Down
3 changes: 2 additions & 1 deletion cmd/bb_replicator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ func main() {

blobAccessCreator := blobstore_configuration.NewCASBlobAccessCreator(
grpcClientFactory,
int(configuration.MaximumMessageSizeBytes))
int(configuration.MaximumMessageSizeBytes),
nil)
source, err := blobstore_configuration.NewBlobAccessFromConfiguration(
dependenciesGroup,
configuration.Source,
Expand Down
2 changes: 2 additions & 0 deletions cmd/bb_storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ go_library(
"//pkg/proto/icas",
"//pkg/proto/iscc",
"//pkg/util",
"//pkg/zstd",
"@bazel_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto",
"@bazel_remote_apis//build/bazel/semver:semver_go_proto",
"@com_github_klauspost_compress//zstd",
"@org_golang_google_genproto_googleapis_bytestream//:bytestream",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//codes",
Expand Down
34 changes: 32 additions & 2 deletions cmd/bb_storage/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"github.com/buildbarn/bb-storage/pkg/proto/icas"
"github.com/buildbarn/bb-storage/pkg/proto/iscc"
"github.com/buildbarn/bb-storage/pkg/util"
bb_zstd "github.com/buildbarn/bb-storage/pkg/zstd"
"github.com/klauspost/compress/zstd"

"google.golang.org/genproto/googleapis/bytestream"
"google.golang.org/grpc"
Expand All @@ -42,6 +44,32 @@ func main() {
return util.StatusWrap(err, "Failed to apply global configuration options")
}

// Create a process-wide ZSTD compression pool if configured.
var zstdPool bb_zstd.Pool
if zc := configuration.ZstdCompression; zc != nil {
encoderOptions := []zstd.EOption{
zstd.WithEncoderConcurrency(1),
}
if zc.EncoderWindowSizeBytes != 0 {
encoderOptions = append(encoderOptions, zstd.WithWindowSize(int(zc.EncoderWindowSizeBytes)))
}
if zc.EncoderLevel != 0 {
encoderOptions = append(encoderOptions, zstd.WithEncoderLevel(zstd.EncoderLevel(zc.EncoderLevel)))
}
decoderOptions := []zstd.DOption{
zstd.WithDecoderConcurrency(1),
}
if zc.DecoderMaxWindowSizeBytes != 0 {
decoderOptions = append(decoderOptions, zstd.WithDecoderMaxWindow(uint64(zc.DecoderMaxWindowSizeBytes)))
}
zstdPool = bb_zstd.NewBoundedPool(
zc.MaximumEncoders,
zc.MaximumDecoders,
encoderOptions,
decoderOptions,
)
}

// Providers for data returned by ServerCapabilities.cache_capabilities
// as part of the GetCapabilities() call. We permit these calls
// if the client is permitted to at least one method against one
Expand All @@ -58,7 +86,8 @@ func main() {
configuration.ContentAddressableStorage,
blobstore_configuration.NewCASBlobAccessCreator(
grpcClientFactory,
int(configuration.MaximumMessageSizeBytes)),
int(configuration.MaximumMessageSizeBytes),
zstdPool),
grpcClientFactory)
if err != nil {
return util.StatusWrap(err, "Failed to create Content Addressable Storage")
Expand Down Expand Up @@ -185,7 +214,8 @@ func main() {
s,
grpcservers.NewByteStreamServer(
contentAddressableStorage,
1<<16))
1<<16,
zstdPool))
}
if actionCache != nil {
remoteexecution.RegisterActionCacheServer(
Expand Down
1 change: 1 addition & 0 deletions pkg/blobstore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ go_library(
"//pkg/proto/icas",
"//pkg/proto/iscc",
"//pkg/util",
"//pkg/zstd",
"@bazel_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto",
"@com_github_aws_aws_sdk_go_v2//aws",
"@com_github_aws_aws_sdk_go_v2_service_s3//:s3",
Expand Down
1 change: 1 addition & 0 deletions pkg/blobstore/configuration/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ go_library(
"//pkg/proto/configuration/digest",
"//pkg/random",
"//pkg/util",
"//pkg/zstd",
"@bazel_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto",
"@com_github_aws_aws_sdk_go_v2_service_s3//:s3",
"@com_github_fxtlabs_primes//:primes",
Expand Down
7 changes: 5 additions & 2 deletions pkg/blobstore/configuration/cas_blob_access_creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/buildbarn/bb-storage/pkg/blobstore"
"github.com/buildbarn/bb-storage/pkg/blobstore/grpcclients"
"github.com/buildbarn/bb-storage/pkg/blobstore/local"
bb_zstd "github.com/buildbarn/bb-storage/pkg/zstd"
"github.com/buildbarn/bb-storage/pkg/capabilities"
"github.com/buildbarn/bb-storage/pkg/cloud/aws"
"github.com/buildbarn/bb-storage/pkg/cloud/gcp"
Expand Down Expand Up @@ -38,18 +39,20 @@ type casBlobAccessCreator struct {
casBlobReplicatorCreator

maximumMessageSizeBytes int
zstdPool bb_zstd.Pool
}

// NewCASBlobAccessCreator creates a BlobAccessCreator that can be
// provided to NewBlobAccessFromConfiguration() to construct a
// BlobAccess that is suitable for accessing the Content Addressable
// Storage.
func NewCASBlobAccessCreator(grpcClientFactory grpc.ClientFactory, maximumMessageSizeBytes int) BlobAccessCreator {
func NewCASBlobAccessCreator(grpcClientFactory grpc.ClientFactory, maximumMessageSizeBytes int, zstdPool bb_zstd.Pool) BlobAccessCreator {
return &casBlobAccessCreator{
casBlobReplicatorCreator: casBlobReplicatorCreator{
grpcClientFactory: grpcClientFactory,
},
maximumMessageSizeBytes: maximumMessageSizeBytes,
zstdPool: zstdPool,
}
}

Expand Down Expand Up @@ -96,7 +99,7 @@ func (bac *casBlobAccessCreator) NewCustomBlobAccess(terminationGroup program.Gr
// TODO: Should we provide a configuration option, so
// that digest.KeyWithoutInstance can be used?
return BlobAccessInfo{
BlobAccess: grpcclients.NewCASBlobAccess(client, uuid.NewRandom, 64<<10, backend.Grpc.EnableCompression),
BlobAccess: grpcclients.NewCASBlobAccess(client, uuid.NewRandom, 64<<10, backend.Grpc.EnableZstdCompression, bac.zstdPool),
DigestKeyFormat: digest.KeyWithInstance,
}, "grpc", nil
case *pb.BlobAccessConfiguration_ReferenceExpanding:
Expand Down
2 changes: 1 addition & 1 deletion pkg/blobstore/configuration/new_blob_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ func NewCASAndACBlobAccessFromConfiguration(terminationGroup program.Group, conf
contentAddressableStorage, err := NewBlobAccessFromConfiguration(
terminationGroup,
configuration.GetContentAddressableStorage(),
NewCASBlobAccessCreator(grpcClientFactory, maximumMessageSizeBytes))
NewCASBlobAccessCreator(grpcClientFactory, maximumMessageSizeBytes, nil))
if err != nil {
return nil, nil, util.StatusWrap(err, "Failed to create Content Addressable Storage")
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/blobstore/grpcclients/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ go_library(
"//pkg/proto/icas",
"//pkg/proto/iscc",
"//pkg/util",
"//pkg/zstd",
"@bazel_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto",
"@com_github_google_uuid//:uuid",
"@com_github_klauspost_compress//zstd",
"@org_golang_google_genproto_googleapis_bytestream//:bytestream",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//codes",
Expand All @@ -33,14 +33,17 @@ go_library(

go_test(
name = "grpcclients_test",
srcs = ["cas_blob_access_test.go"],
srcs = [
"cas_blob_access_test.go",
],
deps = [
":grpcclients",
"//internal/mock",
"//pkg/blobstore/buffer",
"//pkg/digest",
"//pkg/testutil",
"//pkg/util",
"//pkg/zstd",
"@bazel_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto",
"@bazel_remote_apis//build/bazel/semver:semver_go_proto",
"@com_github_google_uuid//:uuid",
Expand Down
96 changes: 68 additions & 28 deletions pkg/blobstore/grpcclients/cas_blob_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package grpcclients

import (
"context"
"errors"
"io"
"slices"
"sync"
Expand All @@ -13,8 +14,8 @@ import (
"github.com/buildbarn/bb-storage/pkg/blobstore/slicing"
"github.com/buildbarn/bb-storage/pkg/digest"
"github.com/buildbarn/bb-storage/pkg/util"
bb_zstd "github.com/buildbarn/bb-storage/pkg/zstd"
"github.com/google/uuid"
"github.com/klauspost/compress/zstd"

"google.golang.org/genproto/googleapis/bytestream"
"google.golang.org/grpc"
Expand All @@ -31,6 +32,7 @@ type casBlobAccess struct {
readChunkSize int
enableZSTDCompression bool
supportedCompressors atomic.Pointer[[]remoteexecution.Compressor_Value]
zstdPool bb_zstd.Pool
}

// NewCASBlobAccess creates a BlobAccess handle that relays any requests
Expand All @@ -40,15 +42,17 @@ type casBlobAccess struct {
// Addressable Storage.
//
// If enableZSTDCompression is true, the client will use ZSTD compression
// for ByteStream operations if the server supports it.
func NewCASBlobAccess(client grpc.ClientConnInterface, uuidGenerator util.UUIDGenerator, readChunkSize int, enableZSTDCompression bool) blobstore.BlobAccess {
// for ByteStream operations if the server supports it. In that case,
// zstdPool must be provided for pooling encoders/decoders.
func NewCASBlobAccess(client grpc.ClientConnInterface, uuidGenerator util.UUIDGenerator, readChunkSize int, enableZSTDCompression bool, zstdPool bb_zstd.Pool) blobstore.BlobAccess {
return &casBlobAccess{
byteStreamClient: bytestream.NewByteStreamClient(client),
contentAddressableStorageClient: remoteexecution.NewContentAddressableStorageClient(client),
capabilitiesClient: remoteexecution.NewCapabilitiesClient(client),
uuidGenerator: uuidGenerator,
readChunkSize: readChunkSize,
enableZSTDCompression: enableZSTDCompression,
zstdPool: zstdPool,
}
}

Expand All @@ -74,47 +78,65 @@ func (r *byteStreamChunkReader) Close() {
}
}

// zstdByteStreamChunkReader reads compressed data from gRPC stream and decompresses using pooled decoder.
type zstdByteStreamChunkReader struct {
client bytestream.ByteStream_ReadClient
cancel context.CancelFunc
zstdReader io.ReadCloser
pool bb_zstd.Pool
decoder bb_zstd.Decoder
pipeReader *io.PipeReader
pipeWriter *io.PipeWriter
readChunkSize int
wg sync.WaitGroup
initOnce sync.Once
initErr error
}

func (r *zstdByteStreamChunkReader) Read() ([]byte, error) {
if r.zstdReader == nil {
pr, pw := io.Pipe()
func (r *zstdByteStreamChunkReader) init(ctx context.Context) error {
r.initOnce.Do(func() {
r.pipeReader, r.pipeWriter = io.Pipe()

// Start goroutine to read from gRPC and write to pipe
r.wg.Add(1)
go func() {
defer r.wg.Done()
defer pw.Close()
defer r.pipeWriter.Close()
for {
chunk, err := r.client.Recv()
if err != nil {
if err != io.EOF {
pw.CloseWithError(err)
r.pipeWriter.CloseWithError(err)
}
return
}
if _, writeErr := pw.Write(chunk.Data); writeErr != nil {
pw.CloseWithError(writeErr)
if _, writeErr := r.pipeWriter.Write(chunk.Data); writeErr != nil {
r.pipeWriter.CloseWithError(writeErr)
return
}
}
}()

var err error
r.zstdReader, err = util.NewZstdReadCloser(pr, zstd.WithDecoderConcurrency(1))
if err != nil {
pr.Close()
return nil, err
// Acquire decoder from pool (blocking if at capacity).
r.decoder, r.initErr = r.pool.NewDecoder(ctx, r.pipeReader)
if r.initErr != nil {
r.pipeReader.CloseWithError(r.initErr)
}
})
return r.initErr
}

func (r *zstdByteStreamChunkReader) Read() ([]byte, error) {
// Lazy initialization on first read. We use context.Background() here
// because the buffer.ChunkReader interface does not propagate a context.
// This means decoder pool acquisition on the read path cannot be
// cancelled by the original request context. The pool's semaphore will
// still bound concurrency; the caller can cancel by closing the reader.
if err := r.init(context.Background()); err != nil {
return nil, err
}

buf := make([]byte, r.readChunkSize)
n, err := r.zstdReader.Read(buf)
n, err := r.decoder.Read(buf)
if n > 0 {
if err != nil && err != io.EOF {
err = nil
Expand All @@ -125,12 +147,19 @@ func (r *zstdByteStreamChunkReader) Read() ([]byte, error) {
}

func (r *zstdByteStreamChunkReader) Close() {
if r.zstdReader != nil {
r.zstdReader.Close()
// Close releases decoder back to pool.
if r.decoder != nil {
r.decoder.Close()
r.decoder = nil
}

if r.pipeReader != nil {
r.pipeReader.Close()
}

r.cancel()

// Drain the gRPC stream.
// Drain the gRPC stream
for {
if _, err := r.client.Recv(); err != nil {
break
Expand Down Expand Up @@ -223,6 +252,7 @@ func (ba *casBlobAccess) Get(ctx context.Context, digest digest.Digest) buffer.B
return buffer.NewCASBufferFromChunkReader(digest, &zstdByteStreamChunkReader{
client: client,
cancel: cancel,
pool: ba.zstdPool,
readChunkSize: ba.readChunkSize,
}, buffer.BackendProvided(buffer.Irreparable(digest)))
}
Expand Down Expand Up @@ -269,21 +299,31 @@ func (ba *casBlobAccess) Put(ctx context.Context, digest digest.Digest, b buffer
cancel: cancel,
}

zstdWriter, err := zstd.NewWriter(byteStreamWriter, zstd.WithEncoderConcurrency(1))
// Acquire encoder from pool (blocks if at capacity — provides backpressure).
encoder, err := ba.zstdPool.NewEncoder(ctx, byteStreamWriter)
if err != nil {
cancel()
client.CloseAndRecv()
return status.Errorf(codes.Internal, "Failed to create zstd writer: %v", err)
b.Discard()
if _, closeErr := client.CloseAndRecv(); closeErr != nil {
return status.Errorf(codes.Internal, "Failed to close client: %v and acquire encoder: %v", closeErr, err)
}
return status.Errorf(codes.ResourceExhausted, "Failed to acquire ZSTD encoder: %v", err)
}

if err := b.IntoWriter(zstdWriter); err != nil {
zstdWriter.Close()
byteStreamWriter.Close()
if err := b.IntoWriter(encoder); err != nil {
if zstdCloseErr := encoder.Close(); zstdCloseErr != nil {
err = errors.Join(err, zstdCloseErr)
}
if closeErr := byteStreamWriter.Close(); closeErr != nil {
err = errors.Join(err, closeErr)
}
return err
}

if err := zstdWriter.Close(); err != nil {
byteStreamWriter.Close()
if err := encoder.Close(); err != nil {
if closeErr := byteStreamWriter.Close(); closeErr != nil {
err = errors.Join(err, closeErr)
}
return err
}

Expand Down
Loading
Loading