From ab6ff10adb1a137d5142f281f711d0fcb4fbbca2 Mon Sep 17 00:00:00 2001 From: Sharmila Jesupaul Date: Mon, 9 Mar 2026 18:43:26 -0700 Subject: [PATCH] dbcleanup: add a dedicated cleanup worker mode `bb-portal` currently starts the database cleanup loop inside every serving process. In a multi-replica deployment that multiplies cleanup work, makes compaction backlogs more expensive, and ties cleanup failures directly to the request-serving pods. Add a `--cleanup-worker-only` mode that starts only the BES cleanup loop. This lets deployments run cleanup in a separate singleton worker pod while leaving normal `bb-portal` replicas focused on serving traffic. Also put a small ordered batch limit on `CompactLogs()`. A single cleanup pass should not try to compact every eligible invocation in one shot when there is a large backlog. --- README.md | 7 ++ cmd/bb_portal/main.go | 103 ++++++++++++++---- .../database/dbcleanupservice/compact_logs.go | 4 + .../dbcleanupservice/dbcleanupservice_test.go | 50 +++++++++ .../database/dbcleanupservice/export_test.go | 11 ++ 5 files changed, 154 insertions(+), 21 deletions(-) create mode 100644 internal/database/dbcleanupservice/export_test.go diff --git a/README.md b/README.md index c13fdedf..5f4ea4b7 100644 --- a/README.md +++ b/README.md @@ -54,6 +54,13 @@ bazel run //cmd/bb_portal -- $PWD/config/portal.jsonnet The backend runs a reverse proxy for the frontend. +To run the cleanup loop in a dedicated worker process instead of the serving +process, start the same binary in cleanup-worker mode: + +``` +bazel run //cmd/bb_portal -- $PWD/config/portal.jsonnet --cleanup-worker-only +``` + ### Running the Frontend From `./frontend`, run: diff --git a/cmd/bb_portal/main.go b/cmd/bb_portal/main.go index 01c19933..e9e4efbc 100644 --- a/cmd/bb_portal/main.go +++ b/cmd/bb_portal/main.go @@ -2,6 +2,8 @@ package main import ( "context" + "flag" + "io" "log" "net/http" "net/http/httputil" @@ -56,16 +58,18 @@ import ( const ( readHeaderTimeout = 3 * time.Second folderPermission = 0o750 + cleanupWorkerFlag = "cleanup-worker-only" ) func main() { program.RunMain(func(ctx context.Context, siblingsGroup, dependenciesGroup program.Group) error { - if len(os.Args) != 2 { - return status.Error(codes.InvalidArgument, "Usage: bb_portal bb_portal.jsonnet") + configurationPath, runCleanupWorkerOnly, err := parseArguments(os.Args) + if err != nil { + return err } var configuration bb_portal.ApplicationConfiguration - if err := util.UnmarshalConfigurationFromFile(os.Args[1], &configuration); err != nil { - return util.StatusWrapf(err, "Failed to read configuration from %s", os.Args[1]) + if err := util.UnmarshalConfigurationFromFile(configurationPath, &configuration); err != nil { + return util.StatusWrapf(err, "Failed to read configuration from %s", configurationPath) } prometheusmetrics.RegisterMetrics() @@ -79,6 +83,16 @@ func main() { if tracerProvider == nil || reflect.ValueOf(tracerProvider).IsNil() { return status.Error(codes.Internal, "Otel tracer provider is nil") } + + if runCleanupWorkerOnly { + log.Println("Starting in cleanup worker only mode") + if err := startBuildEventStreamCleanupWorker(ctx, &configuration, dependenciesGroup, tracerProvider); err != nil { + return util.StatusWrap(err, "Failed to start BES cleanup worker") + } + lifecycleState.MarkReadyAndWait(siblingsGroup) + return nil + } + router := mux.NewRouter() router.Use(otelmux.Middleware("bb-portal-http", otelmux.WithTracerProvider(tracerProvider))) @@ -113,36 +127,56 @@ func main() { }) } -func newBuildEventStreamService( - configuration *bb_portal.ApplicationConfiguration, - siblingsGroup program.Group, - dependenciesGroup program.Group, - grpcClientFactory bb_grpc.ClientFactory, - router *mux.Router, - tracerProvider trace.TracerProvider, -) error { - besConfiguration := configuration.BesServiceConfiguration - if besConfiguration == nil { - log.Printf("Did not start BuildEventStream service because buildEventStreamConfiguration is not configured") - return nil +func parseArguments(args []string) (string, bool, error) { + flagSet := flag.NewFlagSet(args[0], flag.ContinueOnError) + flagSet.SetOutput(io.Discard) + runCleanupWorkerOnly := flagSet.Bool(cleanupWorkerFlag, false, "Run only the BES cleanup worker") + if err := flagSet.Parse(args[1:]); err != nil { + return "", false, status.Errorf(codes.InvalidArgument, "Failed to parse flags: %v", err) + } + if flagSet.NArg() != 1 { + return "", false, status.Error(codes.InvalidArgument, "Usage: bb_portal [--cleanup-worker-only] bb_portal.jsonnet") } + return flagSet.Arg(0), *runCleanupWorkerOnly, nil +} +func newBuildEventStreamDatabaseClient( + besConfiguration *bb_portal.BuildEventStreamService, + tracerProvider trace.TracerProvider, +) (database.Client, error) { dialect, connection, err := common.NewSQLConnectionFromConfiguration(besConfiguration.Database, tracerProvider) if err != nil { - return util.StatusWrap(err, "Failed to connect to database for BuildEventStreamService") + return nil, util.StatusWrap(err, "Failed to connect to database for BuildEventStreamService") } dbClient, err := database.New(dialect, connection) if err != nil { - return util.StatusWrap(err, "Failed to create database client from connection") + return nil, util.StatusWrap(err, "Failed to create database client from connection") } // Attempt to migrate towards ents model. if err = dbClient.Ent().Schema.Create(context.Background(), migrate.WithDropIndex(true)); err != nil { - return util.StatusWrap(err, "Could not automatically migrate to desired schema") + return nil, util.StatusWrap(err, "Could not automatically migrate to desired schema") + } + return dbClient, nil +} + +func startBuildEventStreamCleanupWorker( + ctx context.Context, + configuration *bb_portal.ApplicationConfiguration, + dependenciesGroup program.Group, + tracerProvider trace.TracerProvider, +) error { + besConfiguration := configuration.BesServiceConfiguration + if besConfiguration == nil { + return status.Error(codes.InvalidArgument, "BuildEventStreamService must be configured when running cleanup worker") + } + + dbClient, err := newBuildEventStreamDatabaseClient(besConfiguration, tracerProvider) + if err != nil { + return err } - // Configure the database cleanup service. cleanupConfiguration := besConfiguration.DatabaseCleanupConfiguration if cleanupConfiguration == nil { return status.Error(codes.InvalidArgument, "No databaseCleanupConfiguration configured for BuildEventStreamService") @@ -158,7 +192,34 @@ func newBuildEventStreamService( return util.StatusWrap(err, "Failed to create DatabaseCleanupService") } - databaseCleanerService.StartDbCleanupService(context.Background(), dependenciesGroup) + databaseCleanerService.StartDbCleanupService(ctx, dependenciesGroup) + return nil +} + +func newBuildEventStreamService( + configuration *bb_portal.ApplicationConfiguration, + siblingsGroup program.Group, + dependenciesGroup program.Group, + grpcClientFactory bb_grpc.ClientFactory, + router *mux.Router, + tracerProvider trace.TracerProvider, +) error { + besConfiguration := configuration.BesServiceConfiguration + if besConfiguration == nil { + log.Printf("Did not start BuildEventStream service because buildEventStreamConfiguration is not configured") + return nil + } + + dbClient, err := newBuildEventStreamDatabaseClient(besConfiguration, tracerProvider) + if err != nil { + return err + } + + // Configure the database cleanup service. + cleanupConfiguration := besConfiguration.DatabaseCleanupConfiguration + if cleanupConfiguration == nil { + return status.Error(codes.InvalidArgument, "No databaseCleanupConfiguration configured for BuildEventStreamService") + } instanceNameAuthorizer, err := auth_configuration.DefaultAuthorizerFactory.NewAuthorizerFromConfiguration(configuration.InstanceNameAuthorizer, dependenciesGroup, grpcClientFactory) if err != nil { diff --git a/internal/database/dbcleanupservice/compact_logs.go b/internal/database/dbcleanupservice/compact_logs.go index 04959ad0..aecb3ca1 100644 --- a/internal/database/dbcleanupservice/compact_logs.go +++ b/internal/database/dbcleanupservice/compact_logs.go @@ -14,6 +14,8 @@ import ( "go.opentelemetry.io/otel/codes" ) +var compactLogsBatchSize int = 10 + func (dc *DbCleanupService) normalizeInvocation(ctx context.Context, invocation *ent.BazelInvocation) (err error) { ctx, span := dc.tracer.Start(ctx, "DbCleanupService.normalizeInvocation") defer func() { @@ -100,6 +102,8 @@ func (dc *DbCleanupService) CompactLogs(ctx context.Context) error { bazelinvocation.HasBuildLogChunks(), ), ). + Order(ent.Asc(bazelinvocation.FieldID)). + Limit(compactLogsBatchSize). All(ctx) if err != nil { span.RecordError(err) diff --git a/internal/database/dbcleanupservice/dbcleanupservice_test.go b/internal/database/dbcleanupservice/dbcleanupservice_test.go index cbe8cc27..70789f71 100644 --- a/internal/database/dbcleanupservice/dbcleanupservice_test.go +++ b/internal/database/dbcleanupservice/dbcleanupservice_test.go @@ -94,6 +94,10 @@ func TestCompactLogs(t *testing.T) { ctx = dbauthservice.NewContextWithDbAuthServiceBypass(ctx) clock := mock.NewMockClock(ctrl) traceProvider := noop.NewTracerProvider() + originalBatchSize := dbcleanupservice.CompactLogsBatchSizeForTest() + t.Cleanup(func() { + dbcleanupservice.SetCompactLogsBatchSizeForTest(originalBatchSize) + }) t.Run("FinishedInvocationWithoutIncompleteLog", func(t *testing.T) { db := testutils.SetupTestDB(t, dbProvider) @@ -195,6 +199,52 @@ func TestCompactLogs(t *testing.T) { require.NoError(t, err) requireIncompleteLogCount(t, client, 6) }) + + t.Run("ProcessesFixedBatchPerRun", func(t *testing.T) { + dbcleanupservice.SetCompactLogsBatchSizeForTest(1) + t.Cleanup(func() { + dbcleanupservice.SetCompactLogsBatchSizeForTest(originalBatchSize) + }) + db := testutils.SetupTestDB(t, dbProvider) + client := db.Ent() + instanceNameDbID := createInstanceName(t, ctx, client, "testInstance") + firstInvocation, err := client.BazelInvocation.Create(). + SetInvocationID(uuid.New()). + SetInstanceNameID(instanceNameDbID). + SetBepCompleted(true). + Save(ctx) + require.NoError(t, err) + secondInvocation, err := client.BazelInvocation.Create(). + SetInvocationID(uuid.New()). + SetInstanceNameID(instanceNameDbID). + SetBepCompleted(true). + Save(ctx) + require.NoError(t, err) + + populateIncompleteBuildLog(t, ctx, client, firstInvocation.ID) + populateIncompleteBuildLog(t, ctx, client, secondInvocation.ID) + + cleanup, err := getNewDbCleanupService(db, clock, traceProvider) + require.NoError(t, err) + + err = cleanup.CompactLogs(ctx) + require.NoError(t, err) + + count, err := client.BazelInvocation.Query(). + Where(bazelinvocation.HasBuildLogChunks()). + Count(ctx) + require.NoError(t, err) + require.Equal(t, 1, count) + + err = cleanup.CompactLogs(ctx) + require.NoError(t, err) + + count, err = client.BazelInvocation.Query(). + Where(bazelinvocation.HasBuildLogChunks()). + Count(ctx) + require.NoError(t, err) + require.Equal(t, 2, count) + }) } func TestRemoveBuildsWithoutInvocations(t *testing.T) { diff --git a/internal/database/dbcleanupservice/export_test.go b/internal/database/dbcleanupservice/export_test.go new file mode 100644 index 00000000..05b097f8 --- /dev/null +++ b/internal/database/dbcleanupservice/export_test.go @@ -0,0 +1,11 @@ +package dbcleanupservice + +// SetCompactLogsBatchSizeForTest overrides the compaction batch size for tests. +func SetCompactLogsBatchSizeForTest(batchSize int) { + compactLogsBatchSize = batchSize +} + +// CompactLogsBatchSizeForTest returns the current compaction batch size for tests. +func CompactLogsBatchSizeForTest() int { + return compactLogsBatchSize +}