diff --git a/cmd/bb_copy/BUILD.bazel b/cmd/bb_copy/BUILD.bazel index 8ad6edc1..41a30bb6 100644 --- a/cmd/bb_copy/BUILD.bazel +++ b/cmd/bb_copy/BUILD.bazel @@ -14,6 +14,7 @@ go_library( "//pkg/program", "//pkg/proto/configuration/bb_copy", "//pkg/util", + "//pkg/zstd", "@org_golang_google_grpc//codes", "@org_golang_google_grpc//status", ], diff --git a/cmd/bb_copy/main.go b/cmd/bb_copy/main.go index 1c4b8901..17003bb4 100644 --- a/cmd/bb_copy/main.go +++ b/cmd/bb_copy/main.go @@ -11,6 +11,7 @@ import ( "github.com/buildbarn/bb-storage/pkg/program" "github.com/buildbarn/bb-storage/pkg/proto/configuration/bb_copy" "github.com/buildbarn/bb-storage/pkg/util" + bb_zstd "github.com/buildbarn/bb-storage/pkg/zstd" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -43,7 +44,8 @@ func main() { blobAccessCreator := blobstore_configuration.NewCASBlobAccessCreator( grpcClientFactory, - int(configuration.MaximumMessageSizeBytes)) + int(configuration.MaximumMessageSizeBytes), + bb_zstd.NewPoolFromConfiguration(nil)) source, err := blobstore_configuration.NewBlobAccessFromConfiguration( dependenciesGroup, configuration.Source, diff --git a/cmd/bb_replicator/BUILD.bazel b/cmd/bb_replicator/BUILD.bazel index 10c3353c..67f4bb2f 100644 --- a/cmd/bb_replicator/BUILD.bazel +++ b/cmd/bb_replicator/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "//pkg/proto/configuration/bb_replicator", "//pkg/proto/replicator", "//pkg/util", + "//pkg/zstd", "@org_golang_google_grpc//:grpc", "@org_golang_google_grpc//codes", "@org_golang_google_grpc//status", diff --git a/cmd/bb_replicator/main.go b/cmd/bb_replicator/main.go index acd61dc5..e65a483b 100644 --- a/cmd/bb_replicator/main.go +++ b/cmd/bb_replicator/main.go @@ -12,6 +12,7 @@ import ( "github.com/buildbarn/bb-storage/pkg/proto/configuration/bb_replicator" replicator_pb "github.com/buildbarn/bb-storage/pkg/proto/replicator" "github.com/buildbarn/bb-storage/pkg/util" + bb_zstd "github.com/buildbarn/bb-storage/pkg/zstd" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -34,7 +35,8 @@ func main() { blobAccessCreator := blobstore_configuration.NewCASBlobAccessCreator( grpcClientFactory, - int(configuration.MaximumMessageSizeBytes)) + int(configuration.MaximumMessageSizeBytes), + bb_zstd.NewPoolFromConfiguration(nil)) source, err := blobstore_configuration.NewBlobAccessFromConfiguration( dependenciesGroup, configuration.Source, diff --git a/cmd/bb_storage/BUILD.bazel b/cmd/bb_storage/BUILD.bazel index 9d6e75d5..b9d6bc74 100644 --- a/cmd/bb_storage/BUILD.bazel +++ b/cmd/bb_storage/BUILD.bazel @@ -22,6 +22,7 @@ go_library( "//pkg/proto/icas", "//pkg/proto/iscc", "//pkg/util", + "//pkg/zstd", "@bazel_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto", "@bazel_remote_apis//build/bazel/semver:semver_go_proto", "@org_golang_google_genproto_googleapis_bytestream//:bytestream", diff --git a/cmd/bb_storage/main.go b/cmd/bb_storage/main.go index 4befa9de..2160b984 100644 --- a/cmd/bb_storage/main.go +++ b/cmd/bb_storage/main.go @@ -21,6 +21,7 @@ import ( "github.com/buildbarn/bb-storage/pkg/proto/icas" "github.com/buildbarn/bb-storage/pkg/proto/iscc" "github.com/buildbarn/bb-storage/pkg/util" + bb_zstd "github.com/buildbarn/bb-storage/pkg/zstd" "google.golang.org/genproto/googleapis/bytestream" "google.golang.org/grpc" @@ -42,6 +43,9 @@ func main() { return util.StatusWrap(err, "Failed to apply global configuration options") } + // Create a process-wide ZSTD compression pool. + zstdPool := bb_zstd.NewPoolFromConfiguration(configuration.ZstdPool) + // Providers for data returned by ServerCapabilities.cache_capabilities // as part of the GetCapabilities() call. We permit these calls // if the client is permitted to at least one method against one @@ -58,7 +62,8 @@ func main() { configuration.ContentAddressableStorage, blobstore_configuration.NewCASBlobAccessCreator( grpcClientFactory, - int(configuration.MaximumMessageSizeBytes)), + int(configuration.MaximumMessageSizeBytes), + zstdPool), grpcClientFactory) if err != nil { return util.StatusWrap(err, "Failed to create Content Addressable Storage") @@ -185,7 +190,8 @@ func main() { s, grpcservers.NewByteStreamServer( contentAddressableStorage, - 1<<16)) + 1<<16, + zstdPool)) } if actionCache != nil { remoteexecution.RegisterActionCacheServer( diff --git a/pkg/blobstore/BUILD.bazel b/pkg/blobstore/BUILD.bazel index 2649c9c5..45d05453 100644 --- a/pkg/blobstore/BUILD.bazel +++ b/pkg/blobstore/BUILD.bazel @@ -43,10 +43,10 @@ go_library( "//pkg/proto/icas", "//pkg/proto/iscc", "//pkg/util", + "//pkg/zstd", "@bazel_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto", "@com_github_aws_aws_sdk_go_v2//aws", "@com_github_aws_aws_sdk_go_v2_service_s3//:s3", - "@com_github_klauspost_compress//zstd", "@com_github_prometheus_client_golang//prometheus", "@org_golang_google_grpc//codes", "@org_golang_google_grpc//status", @@ -82,11 +82,13 @@ go_test( "//pkg/proto/icas", "//pkg/testutil", "//pkg/util", + "//pkg/zstd", "@bazel_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto", "@bazel_remote_apis//build/bazel/semver:semver_go_proto", "@com_github_aws_aws_sdk_go_v2//aws", "@com_github_aws_aws_sdk_go_v2_service_s3//:s3", "@com_github_aws_aws_sdk_go_v2_service_s3//types", + "@com_github_klauspost_compress//zstd", "@com_github_stretchr_testify//require", "@org_golang_google_grpc//codes", "@org_golang_google_grpc//status", diff --git a/pkg/blobstore/configuration/BUILD.bazel b/pkg/blobstore/configuration/BUILD.bazel index 8274f3e3..33ebe095 100644 --- a/pkg/blobstore/configuration/BUILD.bazel +++ b/pkg/blobstore/configuration/BUILD.bazel @@ -45,6 +45,7 @@ go_library( "//pkg/proto/configuration/digest", "//pkg/random", "//pkg/util", + "//pkg/zstd", "@bazel_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto", "@com_github_aws_aws_sdk_go_v2_service_s3//:s3", "@com_github_fxtlabs_primes//:primes", diff --git a/pkg/blobstore/configuration/cas_blob_access_creator.go b/pkg/blobstore/configuration/cas_blob_access_creator.go index 45f4f971..06d7e703 100644 --- a/pkg/blobstore/configuration/cas_blob_access_creator.go +++ b/pkg/blobstore/configuration/cas_blob_access_creator.go @@ -19,6 +19,7 @@ import ( "github.com/buildbarn/bb-storage/pkg/program" pb "github.com/buildbarn/bb-storage/pkg/proto/configuration/blobstore" "github.com/buildbarn/bb-storage/pkg/util" + bb_zstd "github.com/buildbarn/bb-storage/pkg/zstd" "github.com/google/uuid" "google.golang.org/grpc/codes" @@ -38,18 +39,20 @@ type casBlobAccessCreator struct { casBlobReplicatorCreator maximumMessageSizeBytes int + zstdPool bb_zstd.Pool } // NewCASBlobAccessCreator creates a BlobAccessCreator that can be // provided to NewBlobAccessFromConfiguration() to construct a // BlobAccess that is suitable for accessing the Content Addressable // Storage. -func NewCASBlobAccessCreator(grpcClientFactory grpc.ClientFactory, maximumMessageSizeBytes int) BlobAccessCreator { +func NewCASBlobAccessCreator(grpcClientFactory grpc.ClientFactory, maximumMessageSizeBytes int, zstdPool bb_zstd.Pool) BlobAccessCreator { return &casBlobAccessCreator{ casBlobReplicatorCreator: casBlobReplicatorCreator{ grpcClientFactory: grpcClientFactory, }, maximumMessageSizeBytes: maximumMessageSizeBytes, + zstdPool: zstdPool, } } @@ -93,10 +96,14 @@ func (bac *casBlobAccessCreator) NewCustomBlobAccess(terminationGroup program.Gr if err != nil { return BlobAccessInfo{}, "", err } + var zstdPool bb_zstd.Pool + if backend.Grpc.EnableCompression { + zstdPool = bac.zstdPool + } // TODO: Should we provide a configuration option, so // that digest.KeyWithoutInstance can be used? return BlobAccessInfo{ - BlobAccess: grpcclients.NewCASBlobAccess(client, uuid.NewRandom, 64<<10, backend.Grpc.EnableCompression), + BlobAccess: grpcclients.NewCASBlobAccess(client, uuid.NewRandom, 64<<10, zstdPool), DigestKeyFormat: digest.KeyWithInstance, }, "grpc", nil case *pb.BlobAccessConfiguration_ReferenceExpanding: @@ -158,7 +165,8 @@ func (bac *casBlobAccessCreator) NewCustomBlobAccess(terminationGroup program.Gr }, s3.NewFromConfig(awsConfig), gcsClient, - bac.maximumMessageSizeBytes), + bac.maximumMessageSizeBytes, + bac.zstdPool), DigestKeyFormat: indirectContentAddressableStorage.DigestKeyFormat, }, "reference_expanding", nil default: diff --git a/pkg/blobstore/configuration/new_blob_access.go b/pkg/blobstore/configuration/new_blob_access.go index a0e8f529..2bd6d288 100644 --- a/pkg/blobstore/configuration/new_blob_access.go +++ b/pkg/blobstore/configuration/new_blob_access.go @@ -25,6 +25,7 @@ import ( digest_pb "github.com/buildbarn/bb-storage/pkg/proto/configuration/digest" "github.com/buildbarn/bb-storage/pkg/random" "github.com/buildbarn/bb-storage/pkg/util" + bb_zstd "github.com/buildbarn/bb-storage/pkg/zstd" "github.com/fxtlabs/primes" "google.golang.org/grpc/codes" @@ -602,11 +603,11 @@ func NewBlobAccessFromConfiguration(terminationGroup program.Group, configuratio // create BlobAccess objects for both the Content Addressable Storage // and Action Cache. Most Buildbarn components tend to require access to // both these data stores. -func NewCASAndACBlobAccessFromConfiguration(terminationGroup program.Group, configuration *pb.BlobstoreConfiguration, grpcClientFactory grpc.ClientFactory, maximumMessageSizeBytes int) (blobstore.BlobAccess, blobstore.BlobAccess, error) { +func NewCASAndACBlobAccessFromConfiguration(terminationGroup program.Group, configuration *pb.BlobstoreConfiguration, grpcClientFactory grpc.ClientFactory, maximumMessageSizeBytes int, zstdPool bb_zstd.Pool) (blobstore.BlobAccess, blobstore.BlobAccess, error) { contentAddressableStorage, err := NewBlobAccessFromConfiguration( terminationGroup, configuration.GetContentAddressableStorage(), - NewCASBlobAccessCreator(grpcClientFactory, maximumMessageSizeBytes)) + NewCASBlobAccessCreator(grpcClientFactory, maximumMessageSizeBytes, zstdPool)) if err != nil { return nil, nil, util.StatusWrap(err, "Failed to create Content Addressable Storage") } diff --git a/pkg/blobstore/grpcclients/BUILD.bazel b/pkg/blobstore/grpcclients/BUILD.bazel index bb43132e..257d8fd0 100644 --- a/pkg/blobstore/grpcclients/BUILD.bazel +++ b/pkg/blobstore/grpcclients/BUILD.bazel @@ -20,9 +20,9 @@ go_library( "//pkg/proto/icas", "//pkg/proto/iscc", "//pkg/util", + "//pkg/zstd", "@bazel_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto", "@com_github_google_uuid//:uuid", - "@com_github_klauspost_compress//zstd", "@org_golang_google_genproto_googleapis_bytestream//:bytestream", "@org_golang_google_grpc//:grpc", "@org_golang_google_grpc//codes", @@ -41,6 +41,7 @@ go_test( "//pkg/digest", "//pkg/testutil", "//pkg/util", + "//pkg/zstd", "@bazel_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto", "@bazel_remote_apis//build/bazel/semver:semver_go_proto", "@com_github_google_uuid//:uuid", diff --git a/pkg/blobstore/grpcclients/cas_blob_access.go b/pkg/blobstore/grpcclients/cas_blob_access.go index 160776ea..f35a0566 100644 --- a/pkg/blobstore/grpcclients/cas_blob_access.go +++ b/pkg/blobstore/grpcclients/cas_blob_access.go @@ -13,8 +13,8 @@ import ( "github.com/buildbarn/bb-storage/pkg/blobstore/slicing" "github.com/buildbarn/bb-storage/pkg/digest" "github.com/buildbarn/bb-storage/pkg/util" + bb_zstd "github.com/buildbarn/bb-storage/pkg/zstd" "github.com/google/uuid" - "github.com/klauspost/compress/zstd" "google.golang.org/genproto/googleapis/bytestream" "google.golang.org/grpc" @@ -29,8 +29,8 @@ type casBlobAccess struct { capabilitiesClient remoteexecution.CapabilitiesClient uuidGenerator util.UUIDGenerator readChunkSize int - enableZSTDCompression bool supportedCompressors atomic.Pointer[[]remoteexecution.Compressor_Value] + zstdPool bb_zstd.Pool } // NewCASBlobAccess creates a BlobAccess handle that relays any requests @@ -39,16 +39,16 @@ type casBlobAccess struct { // services that Bazel uses to access blobs stored in the Content // Addressable Storage. // -// If enableZSTDCompression is true, the client will use ZSTD compression -// for ByteStream operations if the server supports it. -func NewCASBlobAccess(client grpc.ClientConnInterface, uuidGenerator util.UUIDGenerator, readChunkSize int, enableZSTDCompression bool) blobstore.BlobAccess { +// If zstdPool is non-nil, the client will use ZSTD compression for +// ByteStream operations if the server supports it. +func NewCASBlobAccess(client grpc.ClientConnInterface, uuidGenerator util.UUIDGenerator, readChunkSize int, zstdPool bb_zstd.Pool) blobstore.BlobAccess { return &casBlobAccess{ byteStreamClient: bytestream.NewByteStreamClient(client), contentAddressableStorageClient: remoteexecution.NewContentAddressableStorageClient(client), capabilitiesClient: remoteexecution.NewCapabilitiesClient(client), uuidGenerator: uuidGenerator, readChunkSize: readChunkSize, - enableZSTDCompression: enableZSTDCompression, + zstdPool: zstdPool, } } @@ -74,47 +74,20 @@ func (r *byteStreamChunkReader) Close() { } } +// zstdByteStreamChunkReader reads compressed data from a gRPC stream +// and decompresses it using a pooled decoder. type zstdByteStreamChunkReader struct { client bytestream.ByteStream_ReadClient cancel context.CancelFunc - zstdReader io.ReadCloser + decoder bb_zstd.Decoder + pipeReader *io.PipeReader readChunkSize int wg sync.WaitGroup } func (r *zstdByteStreamChunkReader) Read() ([]byte, error) { - if r.zstdReader == nil { - pr, pw := io.Pipe() - - r.wg.Add(1) - go func() { - defer r.wg.Done() - defer pw.Close() - for { - chunk, err := r.client.Recv() - if err != nil { - if err != io.EOF { - pw.CloseWithError(err) - } - return - } - if _, writeErr := pw.Write(chunk.Data); writeErr != nil { - pw.CloseWithError(writeErr) - return - } - } - }() - - var err error - r.zstdReader, err = util.NewZstdReadCloser(pr, zstd.WithDecoderConcurrency(1)) - if err != nil { - pr.Close() - return nil, err - } - } - buf := make([]byte, r.readChunkSize) - n, err := r.zstdReader.Read(buf) + n, err := r.decoder.Read(buf) if n > 0 { if err != nil && err != io.EOF { err = nil @@ -125,9 +98,9 @@ func (r *zstdByteStreamChunkReader) Read() ([]byte, error) { } func (r *zstdByteStreamChunkReader) Close() { - if r.zstdReader != nil { - r.zstdReader.Close() - } + r.decoder.Close() + + r.pipeReader.Close() r.cancel() // Drain the gRPC stream. @@ -179,7 +152,7 @@ const resourceNameHeader = "build.bazel.remote.execution.v2.resource-name" // shouldUseZSTDCompression checks if ZSTD compression should be used. // It ensures GetCapabilities has been called to negotiate compression support. func (ba *casBlobAccess) shouldUseZSTDCompression(ctx context.Context, digest digest.Digest) (bool, error) { - if !ba.enableZSTDCompression { + if ba.zstdPool == nil { return false, nil } @@ -220,11 +193,43 @@ func (ba *casBlobAccess) Get(ctx context.Context, digest digest.Digest) buffer.B } if useCompression { - return buffer.NewCASBufferFromChunkReader(digest, &zstdByteStreamChunkReader{ + pipeReader, pipeWriter := io.Pipe() + + r := &zstdByteStreamChunkReader{ client: client, cancel: cancel, + pipeReader: pipeReader, readChunkSize: ba.readChunkSize, - }, buffer.BackendProvided(buffer.Irreparable(digest))) + } + + // Start goroutine to read from gRPC and write to pipe. + r.wg.Add(1) + go func() { + defer r.wg.Done() + defer pipeWriter.Close() + for { + chunk, err := client.Recv() + if err != nil { + if err != io.EOF { + pipeWriter.CloseWithError(err) + } + return + } + if _, writeErr := pipeWriter.Write(chunk.Data); writeErr != nil { + return + } + } + }() + + decoder, err := ba.zstdPool.NewDecoder(ctx, pipeReader) + if err != nil { + pipeReader.CloseWithError(err) + cancel() + return buffer.NewBufferFromError(err) + } + r.decoder = decoder + + return buffer.NewCASBufferFromChunkReader(digest, r, buffer.BackendProvided(buffer.Irreparable(digest))) } return buffer.NewCASBufferFromChunkReader(digest, &byteStreamChunkReader{ @@ -269,20 +274,24 @@ func (ba *casBlobAccess) Put(ctx context.Context, digest digest.Digest, b buffer cancel: cancel, } - zstdWriter, err := zstd.NewWriter(byteStreamWriter, zstd.WithEncoderConcurrency(1)) + // Acquire encoder from pool (blocks if at capacity — provides backpressure). + encoder, err := ba.zstdPool.NewEncoder(ctx, byteStreamWriter) if err != nil { cancel() - client.CloseAndRecv() - return status.Errorf(codes.Internal, "Failed to create zstd writer: %v", err) + b.Discard() + if _, closeErr := client.CloseAndRecv(); closeErr != nil { + return status.Errorf(codes.Internal, "Failed to close client: %v and acquire encoder: %v", closeErr, err) + } + return status.Errorf(codes.ResourceExhausted, "Failed to acquire ZSTD encoder: %v", err) } - if err := b.IntoWriter(zstdWriter); err != nil { - zstdWriter.Close() + if err := b.IntoWriter(encoder); err != nil { + encoder.Close() byteStreamWriter.Close() return err } - if err := zstdWriter.Close(); err != nil { + if err := encoder.Close(); err != nil { byteStreamWriter.Close() return err } diff --git a/pkg/blobstore/grpcclients/cas_blob_access_test.go b/pkg/blobstore/grpcclients/cas_blob_access_test.go index 5fd7819c..4465d1de 100644 --- a/pkg/blobstore/grpcclients/cas_blob_access_test.go +++ b/pkg/blobstore/grpcclients/cas_blob_access_test.go @@ -3,7 +3,9 @@ package grpcclients_test import ( "context" "io" + "sync" "testing" + "time" remoteexecution "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2" "github.com/bazelbuild/remote-apis/build/bazel/semver" @@ -13,6 +15,7 @@ import ( "github.com/buildbarn/bb-storage/pkg/digest" "github.com/buildbarn/bb-storage/pkg/testutil" "github.com/buildbarn/bb-storage/pkg/util" + bb_zstd "github.com/buildbarn/bb-storage/pkg/zstd" "github.com/google/uuid" "github.com/klauspost/compress/zstd" "github.com/stretchr/testify/require" @@ -26,12 +29,20 @@ import ( "go.uber.org/mock/gomock" ) +func newTestZstdPool(maxEncoders, maxDecoders int64) bb_zstd.Pool { + return bb_zstd.NewBoundedPool( + maxEncoders, maxDecoders, + []zstd.EOption{zstd.WithEncoderConcurrency(1)}, + []zstd.DOption{zstd.WithDecoderConcurrency(1)}, + ) +} + func TestCASBlobAccessPut(t *testing.T) { ctrl, ctx := gomock.WithContext(context.Background(), t) client := mock.NewMockClientConnInterface(ctrl) uuidGenerator := mock.NewMockUUIDGenerator(ctrl) - blobAccess := grpcclients.NewCASBlobAccess(client, uuidGenerator.Call, 10, false) + blobAccess := grpcclients.NewCASBlobAccess(client, uuidGenerator.Call, 10, nil) blobDigest := digest.MustNewDigest("hello", remoteexecution.DigestFunction_MD5, "8b1a9953c4611296a827abf8c47804d7", 5) uuid := uuid.Must(uuid.Parse("7d659e5f-0e4b-48f0-ad9f-3489db6e103b")) @@ -169,7 +180,7 @@ func TestCASBlobAccessGet(t *testing.T) { client := mock.NewMockClientConnInterface(ctrl) uuidGenerator := mock.NewMockUUIDGenerator(ctrl) - blobAccess := grpcclients.NewCASBlobAccess(client, uuidGenerator.Call, 10, false) + blobAccess := grpcclients.NewCASBlobAccess(client, uuidGenerator.Call, 10, nil) t.Run("Success", func(t *testing.T) { blobDigest := digest.MustNewDigest("hello", remoteexecution.DigestFunction_MD5, "8b1a9953c4611296a827abf8c47804d7", 5) @@ -270,7 +281,7 @@ func TestCASBlobAccessGetCapabilities(t *testing.T) { client := mock.NewMockClientConnInterface(ctrl) uuidGenerator := mock.NewMockUUIDGenerator(ctrl) - blobAccess := grpcclients.NewCASBlobAccess(client, uuidGenerator.Call, 10, false) + blobAccess := grpcclients.NewCASBlobAccess(client, uuidGenerator.Call, 10, nil) t.Run("BackendFailure", func(t *testing.T) { client.EXPECT().Invoke( @@ -371,28 +382,9 @@ func TestCASBlobAccessPutWithCompression(t *testing.T) { client := mock.NewMockClientConnInterface(ctrl) uuidGenerator := mock.NewMockUUIDGenerator(ctrl) + blobAccess := grpcclients.NewCASBlobAccess(client, uuidGenerator.Call, 10, newTestZstdPool(16, 16)) - // Use compression threshold of 100 bytes to match the hardcoded value - blobAccess := grpcclients.NewCASBlobAccess(client, uuidGenerator.Call, 10, true) - - // Set up GetCapabilities to return ZSTD support - client.EXPECT().Invoke( - gomock.Any(), - "/build.bazel.remote.execution.v2.Capabilities/GetCapabilities", - gomock.Any(), - gomock.Any(), - gomock.Any(), - ).DoAndReturn(func(ctx context.Context, method string, args, reply interface{}, opts ...grpc.CallOption) error { - proto.Merge(reply.(proto.Message), &remoteexecution.ServerCapabilities{ - CacheCapabilities: &remoteexecution.CacheCapabilities{ - DigestFunctions: digest.SupportedDigestFunctions, - SupportedCompressors: []remoteexecution.Compressor_Value{ - remoteexecution.Compressor_ZSTD, - }, - }, - }) - return nil - }).AnyTimes() + expectGetCapabilitiesWithZSTD(client) blobDigest := digest.MustNewDigest("hello", remoteexecution.DigestFunction_MD5, "1411ffd5854fa029dc4d231aa89311eb", 1000) testUUID := uuid.Must(uuid.Parse("7d659e5f-0e4b-48f0-ad9f-3489db6e103b")) @@ -442,14 +434,8 @@ func TestCASBlobAccessPutWithCompression(t *testing.T) { }) } -func TestCASBlobAccessGetWithCompression(t *testing.T) { - ctrl, ctx := gomock.WithContext(context.Background(), t) - - client := mock.NewMockClientConnInterface(ctrl) - uuidGenerator := mock.NewMockUUIDGenerator(ctrl) - blobAccess := grpcclients.NewCASBlobAccess(client, uuidGenerator.Call, 100, true) - - // Set up GetCapabilities to return ZSTD support +// expectGetCapabilitiesWithZSTD sets up a mock to return ZSTD in SupportedCompressors. +func expectGetCapabilitiesWithZSTD(client *mock.MockClientConnInterface) { client.EXPECT().Invoke( gomock.Any(), "/build.bazel.remote.execution.v2.Capabilities/GetCapabilities", @@ -467,6 +453,16 @@ func TestCASBlobAccessGetWithCompression(t *testing.T) { }) return nil }).AnyTimes() +} + +func TestCASBlobAccessGetWithCompression(t *testing.T) { + ctrl, ctx := gomock.WithContext(context.Background(), t) + + client := mock.NewMockClientConnInterface(ctrl) + uuidGenerator := mock.NewMockUUIDGenerator(ctrl) + blobAccess := grpcclients.NewCASBlobAccess(client, uuidGenerator.Call, 100, newTestZstdPool(16, 16)) + + expectGetCapabilitiesWithZSTD(client) t.Run("SuccessWithCompression", func(t *testing.T) { expectedData := make([]byte, 1000) @@ -503,3 +499,165 @@ func TestCASBlobAccessGetWithCompression(t *testing.T) { require.Equal(t, expectedData, data) }) } + +func TestCASBlobAccessPutPoolExhaustion(t *testing.T) { + // Create a pool with only 1 concurrent encoder to test backpressure. + pool := bb_zstd.NewBoundedPool(1, 1, nil, nil) + + ctrl, ctx := gomock.WithContext(context.Background(), t) + client := mock.NewMockClientConnInterface(ctrl) + uuidGenerator := mock.NewMockUUIDGenerator(ctrl) + blobAccess := grpcclients.NewCASBlobAccess(client, uuidGenerator.Call, 10, pool) + + expectGetCapabilitiesWithZSTD(client) + + largeData := make([]byte, 1000) + for i := range largeData { + largeData[i] = byte('A' + (i % 26)) + } + blobDigest := digest.MustNewDigest("hello", remoteexecution.DigestFunction_MD5, "1411ffd5854fa029dc4d231aa89311eb", 1000) + testUUID := uuid.Must(uuid.Parse("7d659e5f-0e4b-48f0-ad9f-3489db6e103b")) + + // Hold the only encoder slot by starting a Put that blocks on SendMsg. + sendBlocked := make(chan struct{}) + sendUnblock := make(chan struct{}) + + clientStream1 := mock.NewMockClientStream(ctrl) + uuidGenerator.EXPECT().Call().Return(testUUID, nil) + client.EXPECT().NewStream(gomock.Any(), gomock.Any(), "/google.bytestream.ByteStream/Write"). + Return(clientStream1, nil) + r1 := mock.NewMockFileReader(ctrl) + r1.EXPECT().ReadAt(gomock.Len(1000), int64(0)).DoAndReturn(func(p []byte, off int64) (int, error) { + copy(p, largeData) + return 1000, nil + }) + r1.EXPECT().Close() + + // First SendMsg blocks, holding the encoder slot. + clientStream1.EXPECT().SendMsg(gomock.Any()).DoAndReturn(func(msg interface{}) error { + close(sendBlocked) + <-sendUnblock + return nil + }) + clientStream1.EXPECT().SendMsg(gomock.Any()).Return(nil) // FinishWrite + clientStream1.EXPECT().CloseSend().Return(nil) + clientStream1.EXPECT().RecvMsg(gomock.Any()).Return(nil) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := blobAccess.Put(ctx, blobDigest, buffer.NewValidatedBufferFromReaderAt(r1, 1000)) + require.NoError(t, err) + }() + + // Wait for the first Put to acquire the encoder and block on SendMsg. + <-sendBlocked + + // Second Put should fail because the pool is exhausted and the context expires. + shortCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) + defer cancel() + + r2 := mock.NewMockFileReader(ctrl) + r2.EXPECT().Close() + + // This Put needs a new stream — but it will fail before using it because + // AcquireEncoder blocks and the context times out. + clientStream2 := mock.NewMockClientStream(ctrl) + uuidGenerator.EXPECT().Call().Return(testUUID, nil) + client.EXPECT().NewStream(gomock.Any(), gomock.Any(), "/google.bytestream.ByteStream/Write"). + Return(clientStream2, nil) + clientStream2.EXPECT().CloseSend().Return(nil).AnyTimes() + clientStream2.EXPECT().RecvMsg(gomock.Any()).Return(nil).AnyTimes() + + err := blobAccess.Put(shortCtx, blobDigest, buffer.NewValidatedBufferFromReaderAt(r2, 1000)) + require.Error(t, err) + require.Equal(t, codes.ResourceExhausted, status.Code(err)) + + // Unblock the first Put so it can complete. + close(sendUnblock) + wg.Wait() +} + +func TestCASBlobAccessPutPoolReleasesEncoder(t *testing.T) { + // Pool with 1 encoder: if encoder isn't released after the first Put, + // the second Put would deadlock. + pool := bb_zstd.NewBoundedPool(1, 1, nil, nil) + + ctrl, ctx := gomock.WithContext(context.Background(), t) + client := mock.NewMockClientConnInterface(ctrl) + uuidGenerator := mock.NewMockUUIDGenerator(ctrl) + blobAccess := grpcclients.NewCASBlobAccess(client, uuidGenerator.Call, 10, pool) + + expectGetCapabilitiesWithZSTD(client) + + largeData := make([]byte, 1000) + for i := range largeData { + largeData[i] = byte('A' + (i % 26)) + } + blobDigest := digest.MustNewDigest("hello", remoteexecution.DigestFunction_MD5, "1411ffd5854fa029dc4d231aa89311eb", 1000) + testUUID := uuid.Must(uuid.Parse("7d659e5f-0e4b-48f0-ad9f-3489db6e103b")) + + // Do two sequential Puts, both must succeed with pool size 1. + for i := 0; i < 2; i++ { + clientStream := mock.NewMockClientStream(ctrl) + uuidGenerator.EXPECT().Call().Return(testUUID, nil) + client.EXPECT().NewStream(gomock.Any(), gomock.Any(), "/google.bytestream.ByteStream/Write"). + Return(clientStream, nil) + r := mock.NewMockFileReader(ctrl) + r.EXPECT().ReadAt(gomock.Len(1000), int64(0)).DoAndReturn(func(p []byte, off int64) (int, error) { + copy(p, largeData) + return 1000, nil + }) + r.EXPECT().Close() + clientStream.EXPECT().SendMsg(gomock.Any()).Return(nil).Times(2) // data + finish + clientStream.EXPECT().CloseSend().Return(nil) + clientStream.EXPECT().RecvMsg(gomock.Any()).Return(nil) + + err := blobAccess.Put(ctx, blobDigest, buffer.NewValidatedBufferFromReaderAt(r, 1000)) + require.NoError(t, err, "Put #%d should succeed", i+1) + } +} + +func TestCASBlobAccessGetPoolReleasesDecoder(t *testing.T) { + // Pool with 1 decoder: if decoder isn't released after the first Get, + // the second Get would deadlock. + pool := bb_zstd.NewBoundedPool(1, 1, nil, nil) + + ctrl, ctx := gomock.WithContext(context.Background(), t) + client := mock.NewMockClientConnInterface(ctrl) + uuidGenerator := mock.NewMockUUIDGenerator(ctrl) + blobAccess := grpcclients.NewCASBlobAccess(client, uuidGenerator.Call, 100, pool) + + expectGetCapabilitiesWithZSTD(client) + + expectedData := make([]byte, 1000) + for i := range expectedData { + expectedData[i] = byte('A' + (i % 26)) + } + largeDigest := digest.MustNewDigest("hello", remoteexecution.DigestFunction_MD5, "1411ffd5854fa029dc4d231aa89311eb", 1000) + + encoder, err := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1)) + require.NoError(t, err) + compressedData := encoder.EncodeAll(expectedData, nil) + + // Do two sequential Gets, both must succeed with pool size 1. + for i := 0; i < 2; i++ { + clientStream := mock.NewMockClientStream(ctrl) + client.EXPECT().NewStream(gomock.Any(), gomock.Any(), "/google.bytestream.ByteStream/Read"). + Return(clientStream, nil) + clientStream.EXPECT().SendMsg(gomock.Any()).Return(nil) + clientStream.EXPECT().RecvMsg(gomock.Any()).DoAndReturn(func(m interface{}) error { + resp := m.(*bytestream.ReadResponse) + resp.Data = compressedData + return nil + }) + clientStream.EXPECT().RecvMsg(gomock.Any()).Return(io.EOF).AnyTimes() + clientStream.EXPECT().CloseSend().Return(nil) + + buf := blobAccess.Get(ctx, largeDigest) + data, err := buf.ToByteSlice(1500) + require.NoError(t, err, "Get #%d should succeed", i+1) + require.Equal(t, expectedData, data) + } +} diff --git a/pkg/blobstore/grpcservers/BUILD.bazel b/pkg/blobstore/grpcservers/BUILD.bazel index 28cea3be..556ad91e 100644 --- a/pkg/blobstore/grpcservers/BUILD.bazel +++ b/pkg/blobstore/grpcservers/BUILD.bazel @@ -20,8 +20,8 @@ go_library( "//pkg/proto/icas", "//pkg/proto/iscc", "//pkg/util", + "//pkg/zstd", "@bazel_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto", - "@com_github_klauspost_compress//zstd", "@org_golang_google_genproto_googleapis_bytestream//:bytestream", "@org_golang_google_grpc//codes", "@org_golang_google_grpc//status", @@ -43,6 +43,7 @@ go_test( "//pkg/digest", "//pkg/proto/icas", "//pkg/testutil", + "//pkg/zstd", "@bazel_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto", "@com_github_klauspost_compress//zstd", "@com_github_stretchr_testify//require", diff --git a/pkg/blobstore/grpcservers/byte_stream_server.go b/pkg/blobstore/grpcservers/byte_stream_server.go index b26e2bd4..8e377bb7 100644 --- a/pkg/blobstore/grpcservers/byte_stream_server.go +++ b/pkg/blobstore/grpcservers/byte_stream_server.go @@ -9,8 +9,7 @@ import ( "github.com/buildbarn/bb-storage/pkg/blobstore" "github.com/buildbarn/bb-storage/pkg/blobstore/buffer" "github.com/buildbarn/bb-storage/pkg/digest" - "github.com/buildbarn/bb-storage/pkg/util" - "github.com/klauspost/compress/zstd" + bb_zstd "github.com/buildbarn/bb-storage/pkg/zstd" "google.golang.org/genproto/googleapis/bytestream" "google.golang.org/grpc/codes" @@ -20,15 +19,17 @@ import ( type byteStreamServer struct { blobAccess blobstore.BlobAccess readChunkSize int + zstdPool bb_zstd.Pool } // NewByteStreamServer creates a GRPC service for reading blobs from and // writing blobs to a BlobAccess. It is used by Bazel to access the // Content Addressable Storage (CAS). -func NewByteStreamServer(blobAccess blobstore.BlobAccess, readChunkSize int) bytestream.ByteStreamServer { +func NewByteStreamServer(blobAccess blobstore.BlobAccess, readChunkSize int, zstdPool bb_zstd.Pool) bytestream.ByteStreamServer { return &byteStreamServer{ blobAccess: blobAccess, readChunkSize: readChunkSize, + zstdPool: zstdPool, } } @@ -40,9 +41,10 @@ func (s *byteStreamServer) Read(in *bytestream.ReadRequest, out bytestream.ByteS if err != nil { return err } + ctx := out.Context() switch compressor { case remoteexecution.Compressor_IDENTITY: - r := s.blobAccess.Get(out.Context(), digest).ToChunkReader(in.ReadOffset, s.readChunkSize) + r := s.blobAccess.Get(ctx, digest).ToChunkReader(in.ReadOffset, s.readChunkSize) defer r.Close() for { @@ -59,13 +61,14 @@ func (s *byteStreamServer) Read(in *bytestream.ReadRequest, out bytestream.ByteS } case remoteexecution.Compressor_ZSTD: - b := s.blobAccess.Get(out.Context(), digest) - zstdWriter, err := zstd.NewWriter(&readStreamWriter{out: out}, zstd.WithEncoderConcurrency(1)) + b := s.blobAccess.Get(ctx, digest) + encoder, err := s.zstdPool.NewEncoder(ctx, &readStreamWriter{out: out}) if err != nil { - return status.Errorf(codes.Internal, "Failed to create zstd writer: %v", err) + b.Discard() + return status.Errorf(codes.ResourceExhausted, "Failed to acquire ZSTD encoder: %v", err) } - defer zstdWriter.Close() - return b.IntoWriter(zstdWriter) + defer encoder.Close() + return b.IntoWriter(encoder) default: return status.Errorf(codes.Unimplemented, "This service does not support downloading compression type: %s", compressor) } @@ -207,6 +210,7 @@ func (zstdWriteStreamReader) Close() error { } func (s *byteStreamServer) writeZstd(stream bytestream.ByteStream_WriteServer, request *bytestream.WriteRequest, digest digest.Digest) error { + ctx := stream.Context() streamReader := &zstdWriteStreamReader{ stream: stream, nextOffset: int64(len(request.Data)), @@ -214,14 +218,14 @@ func (s *byteStreamServer) writeZstd(stream bytestream.ByteStream_WriteServer, r pendingData: request.Data, } - zstdReader, err := util.NewZstdReadCloser(streamReader, zstd.WithDecoderConcurrency(1)) + zstdReader, err := bb_zstd.NewReadCloser(ctx, s.zstdPool, streamReader) if err != nil { - return err + return status.Errorf(codes.ResourceExhausted, "Failed to acquire ZSTD decoder: %v", err) } defer zstdReader.Close() if err := s.blobAccess.Put( - stream.Context(), + ctx, digest, buffer.NewCASBufferFromReader(digest, zstdReader, buffer.UserProvided)); err != nil { return err diff --git a/pkg/blobstore/grpcservers/byte_stream_server_test.go b/pkg/blobstore/grpcservers/byte_stream_server_test.go index d840d671..542eb40f 100644 --- a/pkg/blobstore/grpcservers/byte_stream_server_test.go +++ b/pkg/blobstore/grpcservers/byte_stream_server_test.go @@ -14,6 +14,7 @@ import ( "github.com/buildbarn/bb-storage/pkg/blobstore/grpcservers" "github.com/buildbarn/bb-storage/pkg/digest" "github.com/buildbarn/bb-storage/pkg/testutil" + bb_zstd "github.com/buildbarn/bb-storage/pkg/zstd" "github.com/klauspost/compress/zstd" "github.com/stretchr/testify/require" @@ -33,7 +34,10 @@ func TestByteStreamServer(t *testing.T) { l := bufconn.Listen(1 << 20) server := grpc.NewServer() blobAccess := mock.NewMockBlobAccess(ctrl) - bytestream.RegisterByteStreamServer(server, grpcservers.NewByteStreamServer(blobAccess, 10)) + bytestream.RegisterByteStreamServer(server, grpcservers.NewByteStreamServer(blobAccess, 10, bb_zstd.NewUnboundedPool( + []zstd.EOption{zstd.WithEncoderConcurrency(1)}, + []zstd.DOption{zstd.WithDecoderConcurrency(1)}, + ))) go func() { require.NoError(t, server.Serve(l)) }() diff --git a/pkg/blobstore/reference_expanding_blob_access.go b/pkg/blobstore/reference_expanding_blob_access.go index b33dbbbb..340073a6 100644 --- a/pkg/blobstore/reference_expanding_blob_access.go +++ b/pkg/blobstore/reference_expanding_blob_access.go @@ -18,7 +18,7 @@ import ( "github.com/buildbarn/bb-storage/pkg/digest" "github.com/buildbarn/bb-storage/pkg/proto/icas" "github.com/buildbarn/bb-storage/pkg/util" - "github.com/klauspost/compress/zstd" + bb_zstd "github.com/buildbarn/bb-storage/pkg/zstd" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -31,6 +31,7 @@ type referenceExpandingBlobAccess struct { s3Client cloud_aws.S3Client gcsClient cloud_gcp.StorageClient maximumMessageSizeBytes int + zstdPool bb_zstd.Pool } // getHTTPRangeHeader creates a HTTP Range header based on the offset @@ -47,13 +48,14 @@ func getHTTPRangeHeader(reference *icas.Reference) string { // Storage (CAS) backend. Any object requested through this BlobAccess // will cause its reference to be loaded from the ICAS, followed by // fetching its data from the referenced location. -func NewReferenceExpandingBlobAccess(indirectContentAddressableStorage, contentAddressableStorage BlobAccess, httpClient *http.Client, s3Client cloud_aws.S3Client, gcsClient cloud_gcp.StorageClient, maximumMessageSizeBytes int) BlobAccess { +func NewReferenceExpandingBlobAccess(indirectContentAddressableStorage, contentAddressableStorage BlobAccess, httpClient *http.Client, s3Client cloud_aws.S3Client, gcsClient cloud_gcp.StorageClient, maximumMessageSizeBytes int, zstdPool bb_zstd.Pool) BlobAccess { return &referenceExpandingBlobAccess{ indirectContentAddressableStorage: indirectContentAddressableStorage, contentAddressableStorage: contentAddressableStorage, httpClient: httpClient, s3Client: s3Client, gcsClient: gcsClient, + zstdPool: zstdPool, maximumMessageSizeBytes: maximumMessageSizeBytes, } } @@ -158,7 +160,7 @@ func (ba *referenceExpandingBlobAccess) Get(ctx context.Context, blobDigest dige // GOMAXPROCS. We should just use a single thread, // because many BlobAccess operations may run in // parallel. - decoder, err := util.NewZstdReadCloser(r, zstd.WithDecoderConcurrency(1), zstd.WithDecoderLowmem(true)) + decoder, err := bb_zstd.NewReadCloser(ctx, ba.zstdPool, r) if err != nil { r.Close() return buffer.NewBufferFromError(util.StatusWrapWithCode(err, codes.Internal, "Failed to create Zstandard decoder")) diff --git a/pkg/blobstore/reference_expanding_blob_access_test.go b/pkg/blobstore/reference_expanding_blob_access_test.go index 60b042ce..32ff7c7a 100644 --- a/pkg/blobstore/reference_expanding_blob_access_test.go +++ b/pkg/blobstore/reference_expanding_blob_access_test.go @@ -18,6 +18,8 @@ import ( "github.com/buildbarn/bb-storage/pkg/digest" "github.com/buildbarn/bb-storage/pkg/proto/icas" "github.com/buildbarn/bb-storage/pkg/testutil" + bb_zstd "github.com/buildbarn/bb-storage/pkg/zstd" + "github.com/klauspost/compress/zstd" "github.com/stretchr/testify/require" "google.golang.org/grpc/codes" @@ -40,7 +42,8 @@ func TestReferenceExpandingBlobAccessGet(t *testing.T) { &http.Client{Transport: roundTripper}, s3Client, gcsClient, - 100) + 100, + bb_zstd.NewUnboundedPool(nil, []zstd.DOption{zstd.WithDecoderConcurrency(1)})) helloDigest := digest.MustNewDigest("instance", remoteexecution.DigestFunction_MD5, "8b1a9953c4611296a827abf8c47804d7", 5) t.Run("BackendError", func(t *testing.T) { @@ -493,7 +496,8 @@ func TestReferenceExpandingBlobAccessPut(t *testing.T) { &http.Client{Transport: roundTripper}, s3Client, gcsClient, - 100) + 100, + bb_zstd.NewUnboundedPool(nil, []zstd.DOption{zstd.WithDecoderConcurrency(1)})) t.Run("Failure", func(t *testing.T) { // It is not possible to write objects using @@ -527,7 +531,8 @@ func TestReferenceExpandingBlobAccessFindMissing(t *testing.T) { &http.Client{Transport: roundTripper}, s3Client, gcsClient, - 100) + 100, + bb_zstd.NewUnboundedPool(nil, []zstd.DOption{zstd.WithDecoderConcurrency(1)})) digests := digest.NewSetBuilder(). Add(digest.MustNewDigest("instance", remoteexecution.DigestFunction_MD5, "8b1a9953c4611296a827abf8c47804d7", 5)). diff --git a/pkg/proto/configuration/bb_storage/BUILD.bazel b/pkg/proto/configuration/bb_storage/BUILD.bazel index 6cca4628..8e554777 100644 --- a/pkg/proto/configuration/bb_storage/BUILD.bazel +++ b/pkg/proto/configuration/bb_storage/BUILD.bazel @@ -13,6 +13,7 @@ proto_library( "//pkg/proto/configuration/builder:builder_proto", "//pkg/proto/configuration/global:global_proto", "//pkg/proto/configuration/grpc:grpc_proto", + "//pkg/proto/configuration/zstd:zstd_proto", "@bazel_remote_apis//build/bazel/remote/execution/v2:remote_execution_proto", ], ) @@ -28,6 +29,7 @@ go_proto_library( "//pkg/proto/configuration/builder", "//pkg/proto/configuration/global", "//pkg/proto/configuration/grpc", + "//pkg/proto/configuration/zstd", "@bazel_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto", ], ) diff --git a/pkg/proto/configuration/bb_storage/bb_storage.pb.go b/pkg/proto/configuration/bb_storage/bb_storage.pb.go index 220d6880..5f8e885f 100644 --- a/pkg/proto/configuration/bb_storage/bb_storage.pb.go +++ b/pkg/proto/configuration/bb_storage/bb_storage.pb.go @@ -13,6 +13,7 @@ import ( builder "github.com/buildbarn/bb-storage/pkg/proto/configuration/builder" global "github.com/buildbarn/bb-storage/pkg/proto/configuration/global" grpc "github.com/buildbarn/bb-storage/pkg/proto/configuration/grpc" + zstd "github.com/buildbarn/bb-storage/pkg/proto/configuration/zstd" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" reflect "reflect" @@ -40,6 +41,7 @@ type ApplicationConfiguration struct { FileSystemAccessCache *NonScannableBlobAccessConfiguration `protobuf:"bytes,19,opt,name=file_system_access_cache,json=fileSystemAccessCache,proto3" json:"file_system_access_cache,omitempty"` ExecuteAuthorizer *auth.AuthorizerConfiguration `protobuf:"bytes,16,opt,name=execute_authorizer,json=executeAuthorizer,proto3" json:"execute_authorizer,omitempty"` SupportedCompressors []v2.Compressor_Value `protobuf:"varint,20,rep,packed,name=supported_compressors,json=supportedCompressors,proto3,enum=build.bazel.remote.execution.v2.Compressor_Value" json:"supported_compressors,omitempty"` + ZstdPool *zstd.PoolConfiguration `protobuf:"bytes,21,opt,name=zstd_pool,json=zstdPool,proto3" json:"zstd_pool,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -151,6 +153,13 @@ func (x *ApplicationConfiguration) GetSupportedCompressors() []v2.Compressor_Val return nil } +func (x *ApplicationConfiguration) GetZstdPool() *zstd.PoolConfiguration { + if x != nil { + return x.ZstdPool + } + return nil +} + type NonScannableBlobAccessConfiguration struct { state protoimpl.MessageState `protogen:"open.v1"` Backend *blobstore.BlobAccessConfiguration `protobuf:"bytes,1,opt,name=backend,proto3" json:"backend,omitempty"` @@ -283,8 +292,7 @@ var File_github_com_buildbarn_bb_storage_pkg_proto_configuration_bb_storage_bb_s const file_github_com_buildbarn_bb_storage_pkg_proto_configuration_bb_storage_bb_storage_proto_rawDesc = "" + "\n" + - "Sgithub.com/buildbarn/bb-storage/pkg/proto/configuration/bb_storage/bb_storage.proto\x12\"buildbarn.configuration.bb_storage\x1a6build/bazel/remote/execution/v2/remote_execution.proto\x1aGgithub.com/buildbarn/bb-storage/pkg/proto/configuration/auth/auth.proto\x1aQgithub.com/buildbarn/bb-storage/pkg/proto/configuration/blobstore/blobstore.proto\x1aMgithub.com/buildbarn/bb-storage/pkg/proto/configuration/builder/builder.proto\x1aKgithub.com/buildbarn/bb-storage/pkg/proto/configuration/global/global.proto\x1aGgithub.com/buildbarn/bb-storage/pkg/proto/configuration/grpc/grpc.proto\"\xef\n" + - "\n" + + "Sgithub.com/buildbarn/bb-storage/pkg/proto/configuration/bb_storage/bb_storage.proto\x12\"buildbarn.configuration.bb_storage\x1a6build/bazel/remote/execution/v2/remote_execution.proto\x1aGgithub.com/buildbarn/bb-storage/pkg/proto/configuration/auth/auth.proto\x1aQgithub.com/buildbarn/bb-storage/pkg/proto/configuration/blobstore/blobstore.proto\x1aMgithub.com/buildbarn/bb-storage/pkg/proto/configuration/builder/builder.proto\x1aKgithub.com/buildbarn/bb-storage/pkg/proto/configuration/global/global.proto\x1aGgithub.com/buildbarn/bb-storage/pkg/proto/configuration/grpc/grpc.proto\x1aGgithub.com/buildbarn/bb-storage/pkg/proto/configuration/zstd/zstd.proto\"\xbd\v\n" + "\x18ApplicationConfiguration\x12T\n" + "\fgrpc_servers\x18\x04 \x03(\v21.buildbarn.configuration.grpc.ServerConfigurationR\vgrpcServers\x12l\n" + "\n" + @@ -299,7 +307,8 @@ const file_github_com_buildbarn_bb_storage_pkg_proto_configuration_bb_storage_bb "\x18initial_size_class_cache\x18\v \x01(\v2G.buildbarn.configuration.bb_storage.NonScannableBlobAccessConfigurationR\x15initialSizeClassCache\x12\x80\x01\n" + "\x18file_system_access_cache\x18\x13 \x01(\v2G.buildbarn.configuration.bb_storage.NonScannableBlobAccessConfigurationR\x15fileSystemAccessCache\x12d\n" + "\x12execute_authorizer\x18\x10 \x01(\v25.buildbarn.configuration.auth.AuthorizerConfigurationR\x11executeAuthorizer\x12f\n" + - "\x15supported_compressors\x18\x14 \x03(\x0e21.build.bazel.remote.execution.v2.Compressor.ValueR\x14supportedCompressors\x1av\n" + + "\x15supported_compressors\x18\x14 \x03(\x0e21.build.bazel.remote.execution.v2.Compressor.ValueR\x14supportedCompressors\x12L\n" + + "\tzstd_pool\x18\x15 \x01(\v2/.buildbarn.configuration.zstd.PoolConfigurationR\bzstdPool\x1av\n" + "\x0fSchedulersEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12M\n" + "\x05value\x18\x02 \x01(\v27.buildbarn.configuration.builder.SchedulerConfigurationR\x05value:\x028\x01J\x04\b\x01\x10\x02J\x04\b\x02\x10\x03J\x04\b\x03\x10\x04J\x04\b\x06\x10\aJ\x04\b\a\x10\bJ\x04\b\f\x10\rJ\x04\b\r\x10\x0eJ\x04\b\x0e\x10\x0fJ\x04\b\x0f\x10\x10\"\xb7\x02\n" + @@ -335,8 +344,9 @@ var file_github_com_buildbarn_bb_storage_pkg_proto_configuration_bb_storage_bb_s (*global.Configuration)(nil), // 5: buildbarn.configuration.global.Configuration (*auth.AuthorizerConfiguration)(nil), // 6: buildbarn.configuration.auth.AuthorizerConfiguration (v2.Compressor_Value)(0), // 7: build.bazel.remote.execution.v2.Compressor.Value - (*blobstore.BlobAccessConfiguration)(nil), // 8: buildbarn.configuration.blobstore.BlobAccessConfiguration - (*builder.SchedulerConfiguration)(nil), // 9: buildbarn.configuration.builder.SchedulerConfiguration + (*zstd.PoolConfiguration)(nil), // 8: buildbarn.configuration.zstd.PoolConfiguration + (*blobstore.BlobAccessConfiguration)(nil), // 9: buildbarn.configuration.blobstore.BlobAccessConfiguration + (*builder.SchedulerConfiguration)(nil), // 10: buildbarn.configuration.builder.SchedulerConfiguration } var file_github_com_buildbarn_bb_storage_pkg_proto_configuration_bb_storage_bb_storage_proto_depIdxs = []int32{ 4, // 0: buildbarn.configuration.bb_storage.ApplicationConfiguration.grpc_servers:type_name -> buildbarn.configuration.grpc.ServerConfiguration @@ -349,19 +359,20 @@ var file_github_com_buildbarn_bb_storage_pkg_proto_configuration_bb_storage_bb_s 1, // 7: buildbarn.configuration.bb_storage.ApplicationConfiguration.file_system_access_cache:type_name -> buildbarn.configuration.bb_storage.NonScannableBlobAccessConfiguration 6, // 8: buildbarn.configuration.bb_storage.ApplicationConfiguration.execute_authorizer:type_name -> buildbarn.configuration.auth.AuthorizerConfiguration 7, // 9: buildbarn.configuration.bb_storage.ApplicationConfiguration.supported_compressors:type_name -> build.bazel.remote.execution.v2.Compressor.Value - 8, // 10: buildbarn.configuration.bb_storage.NonScannableBlobAccessConfiguration.backend:type_name -> buildbarn.configuration.blobstore.BlobAccessConfiguration - 6, // 11: buildbarn.configuration.bb_storage.NonScannableBlobAccessConfiguration.get_authorizer:type_name -> buildbarn.configuration.auth.AuthorizerConfiguration - 6, // 12: buildbarn.configuration.bb_storage.NonScannableBlobAccessConfiguration.put_authorizer:type_name -> buildbarn.configuration.auth.AuthorizerConfiguration - 8, // 13: buildbarn.configuration.bb_storage.ScannableBlobAccessConfiguration.backend:type_name -> buildbarn.configuration.blobstore.BlobAccessConfiguration - 6, // 14: buildbarn.configuration.bb_storage.ScannableBlobAccessConfiguration.get_authorizer:type_name -> buildbarn.configuration.auth.AuthorizerConfiguration - 6, // 15: buildbarn.configuration.bb_storage.ScannableBlobAccessConfiguration.put_authorizer:type_name -> buildbarn.configuration.auth.AuthorizerConfiguration - 6, // 16: buildbarn.configuration.bb_storage.ScannableBlobAccessConfiguration.find_missing_authorizer:type_name -> buildbarn.configuration.auth.AuthorizerConfiguration - 9, // 17: buildbarn.configuration.bb_storage.ApplicationConfiguration.SchedulersEntry.value:type_name -> buildbarn.configuration.builder.SchedulerConfiguration - 18, // [18:18] is the sub-list for method output_type - 18, // [18:18] is the sub-list for method input_type - 18, // [18:18] is the sub-list for extension type_name - 18, // [18:18] is the sub-list for extension extendee - 0, // [0:18] is the sub-list for field type_name + 8, // 10: buildbarn.configuration.bb_storage.ApplicationConfiguration.zstd_pool:type_name -> buildbarn.configuration.zstd.PoolConfiguration + 9, // 11: buildbarn.configuration.bb_storage.NonScannableBlobAccessConfiguration.backend:type_name -> buildbarn.configuration.blobstore.BlobAccessConfiguration + 6, // 12: buildbarn.configuration.bb_storage.NonScannableBlobAccessConfiguration.get_authorizer:type_name -> buildbarn.configuration.auth.AuthorizerConfiguration + 6, // 13: buildbarn.configuration.bb_storage.NonScannableBlobAccessConfiguration.put_authorizer:type_name -> buildbarn.configuration.auth.AuthorizerConfiguration + 9, // 14: buildbarn.configuration.bb_storage.ScannableBlobAccessConfiguration.backend:type_name -> buildbarn.configuration.blobstore.BlobAccessConfiguration + 6, // 15: buildbarn.configuration.bb_storage.ScannableBlobAccessConfiguration.get_authorizer:type_name -> buildbarn.configuration.auth.AuthorizerConfiguration + 6, // 16: buildbarn.configuration.bb_storage.ScannableBlobAccessConfiguration.put_authorizer:type_name -> buildbarn.configuration.auth.AuthorizerConfiguration + 6, // 17: buildbarn.configuration.bb_storage.ScannableBlobAccessConfiguration.find_missing_authorizer:type_name -> buildbarn.configuration.auth.AuthorizerConfiguration + 10, // 18: buildbarn.configuration.bb_storage.ApplicationConfiguration.SchedulersEntry.value:type_name -> buildbarn.configuration.builder.SchedulerConfiguration + 19, // [19:19] is the sub-list for method output_type + 19, // [19:19] is the sub-list for method input_type + 19, // [19:19] is the sub-list for extension type_name + 19, // [19:19] is the sub-list for extension extendee + 0, // [0:19] is the sub-list for field type_name } func init() { diff --git a/pkg/proto/configuration/bb_storage/bb_storage.proto b/pkg/proto/configuration/bb_storage/bb_storage.proto index f227b410..365c7552 100644 --- a/pkg/proto/configuration/bb_storage/bb_storage.proto +++ b/pkg/proto/configuration/bb_storage/bb_storage.proto @@ -8,6 +8,7 @@ import "github.com/buildbarn/bb-storage/pkg/proto/configuration/blobstore/blobst import "github.com/buildbarn/bb-storage/pkg/proto/configuration/builder/builder.proto"; import "github.com/buildbarn/bb-storage/pkg/proto/configuration/global/global.proto"; import "github.com/buildbarn/bb-storage/pkg/proto/configuration/grpc/grpc.proto"; +import "github.com/buildbarn/bb-storage/pkg/proto/configuration/zstd/zstd.proto"; option go_package = "github.com/buildbarn/bb-storage/pkg/proto/configuration/bb_storage"; @@ -102,6 +103,12 @@ message ApplicationConfiguration { // Support for IDENTITY (i.e., no compression) is implied. repeated build.bazel.remote.execution.v2.Compressor.Value supported_compressors = 20; + + // ZSTD encoder/decoder pool configuration. When set, creates a + // process-wide pool shared by all gRPC CAS clients and the + // ByteStream server, and enables ZSTD compression for ByteStream + // operations where the server supports it. + buildbarn.configuration.zstd.PoolConfiguration zstd_pool = 21; } // Storage configuration for backends which don't allow batch digest diff --git a/pkg/proto/configuration/zstd/BUILD.bazel b/pkg/proto/configuration/zstd/BUILD.bazel new file mode 100644 index 00000000..6ac41ecb --- /dev/null +++ b/pkg/proto/configuration/zstd/BUILD.bazel @@ -0,0 +1,24 @@ +load("@rules_go//go:def.bzl", "go_library") +load("@rules_go//proto:def.bzl", "go_proto_library") +load("@rules_proto//proto:defs.bzl", "proto_library") + +proto_library( + name = "zstd_proto", + srcs = ["zstd.proto"], + import_prefix = "github.com/buildbarn/bb-storage", + visibility = ["//visibility:public"], +) + +go_proto_library( + name = "zstd_go_proto", + importpath = "github.com/buildbarn/bb-storage/pkg/proto/configuration/zstd", + proto = ":zstd_proto", + visibility = ["//visibility:public"], +) + +go_library( + name = "zstd", + embed = [":zstd_go_proto"], + importpath = "github.com/buildbarn/bb-storage/pkg/proto/configuration/zstd", + visibility = ["//visibility:public"], +) diff --git a/pkg/proto/configuration/zstd/zstd.pb.go b/pkg/proto/configuration/zstd/zstd.pb.go new file mode 100644 index 00000000..be074397 --- /dev/null +++ b/pkg/proto/configuration/zstd/zstd.pb.go @@ -0,0 +1,158 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.11 +// protoc v6.33.4 +// source: github.com/buildbarn/bb-storage/pkg/proto/configuration/zstd/zstd.proto + +package zstd + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" + unsafe "unsafe" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type PoolConfiguration struct { + state protoimpl.MessageState `protogen:"open.v1"` + MaximumEncoders int64 `protobuf:"varint,1,opt,name=maximum_encoders,json=maximumEncoders,proto3" json:"maximum_encoders,omitempty"` + MaximumDecoders int64 `protobuf:"varint,2,opt,name=maximum_decoders,json=maximumDecoders,proto3" json:"maximum_decoders,omitempty"` + EncoderWindowSizeBytes int32 `protobuf:"varint,3,opt,name=encoder_window_size_bytes,json=encoderWindowSizeBytes,proto3" json:"encoder_window_size_bytes,omitempty"` + DecoderWindowSizeBytes int32 `protobuf:"varint,4,opt,name=decoder_window_size_bytes,json=decoderWindowSizeBytes,proto3" json:"decoder_window_size_bytes,omitempty"` + EncoderLevel int32 `protobuf:"varint,5,opt,name=encoder_level,json=encoderLevel,proto3" json:"encoder_level,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *PoolConfiguration) Reset() { + *x = PoolConfiguration{} + mi := &file_github_com_buildbarn_bb_storage_pkg_proto_configuration_zstd_zstd_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *PoolConfiguration) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PoolConfiguration) ProtoMessage() {} + +func (x *PoolConfiguration) ProtoReflect() protoreflect.Message { + mi := &file_github_com_buildbarn_bb_storage_pkg_proto_configuration_zstd_zstd_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PoolConfiguration.ProtoReflect.Descriptor instead. +func (*PoolConfiguration) Descriptor() ([]byte, []int) { + return file_github_com_buildbarn_bb_storage_pkg_proto_configuration_zstd_zstd_proto_rawDescGZIP(), []int{0} +} + +func (x *PoolConfiguration) GetMaximumEncoders() int64 { + if x != nil { + return x.MaximumEncoders + } + return 0 +} + +func (x *PoolConfiguration) GetMaximumDecoders() int64 { + if x != nil { + return x.MaximumDecoders + } + return 0 +} + +func (x *PoolConfiguration) GetEncoderWindowSizeBytes() int32 { + if x != nil { + return x.EncoderWindowSizeBytes + } + return 0 +} + +func (x *PoolConfiguration) GetDecoderWindowSizeBytes() int32 { + if x != nil { + return x.DecoderWindowSizeBytes + } + return 0 +} + +func (x *PoolConfiguration) GetEncoderLevel() int32 { + if x != nil { + return x.EncoderLevel + } + return 0 +} + +var File_github_com_buildbarn_bb_storage_pkg_proto_configuration_zstd_zstd_proto protoreflect.FileDescriptor + +const file_github_com_buildbarn_bb_storage_pkg_proto_configuration_zstd_zstd_proto_rawDesc = "" + + "\n" + + "Ggithub.com/buildbarn/bb-storage/pkg/proto/configuration/zstd/zstd.proto\x12\x1cbuildbarn.configuration.zstd\"\x84\x02\n" + + "\x11PoolConfiguration\x12)\n" + + "\x10maximum_encoders\x18\x01 \x01(\x03R\x0fmaximumEncoders\x12)\n" + + "\x10maximum_decoders\x18\x02 \x01(\x03R\x0fmaximumDecoders\x129\n" + + "\x19encoder_window_size_bytes\x18\x03 \x01(\x05R\x16encoderWindowSizeBytes\x129\n" + + "\x19decoder_window_size_bytes\x18\x04 \x01(\x05R\x16decoderWindowSizeBytes\x12#\n" + + "\rencoder_level\x18\x05 \x01(\x05R\fencoderLevelB>Z