Skip to content

Commit ab6ff10

Browse files
committed
dbcleanup: add a dedicated cleanup worker mode
`bb-portal` currently starts the database cleanup loop inside every serving process. In a multi-replica deployment that multiplies cleanup work, makes compaction backlogs more expensive, and ties cleanup failures directly to the request-serving pods. Add a `--cleanup-worker-only` mode that starts only the BES cleanup loop. This lets deployments run cleanup in a separate singleton worker pod while leaving normal `bb-portal` replicas focused on serving traffic. Also put a small ordered batch limit on `CompactLogs()`. A single cleanup pass should not try to compact every eligible invocation in one shot when there is a large backlog.
1 parent 40eecb5 commit ab6ff10

5 files changed

Lines changed: 154 additions & 21 deletions

File tree

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,13 @@ bazel run //cmd/bb_portal -- $PWD/config/portal.jsonnet
5454

5555
The backend runs a reverse proxy for the frontend.
5656

57+
To run the cleanup loop in a dedicated worker process instead of the serving
58+
process, start the same binary in cleanup-worker mode:
59+
60+
```
61+
bazel run //cmd/bb_portal -- $PWD/config/portal.jsonnet --cleanup-worker-only
62+
```
63+
5764
### Running the Frontend
5865

5966
From `./frontend`, run:

cmd/bb_portal/main.go

Lines changed: 82 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package main
22

33
import (
44
"context"
5+
"flag"
6+
"io"
57
"log"
68
"net/http"
79
"net/http/httputil"
@@ -56,16 +58,18 @@ import (
5658
const (
5759
readHeaderTimeout = 3 * time.Second
5860
folderPermission = 0o750
61+
cleanupWorkerFlag = "cleanup-worker-only"
5962
)
6063

6164
func main() {
6265
program.RunMain(func(ctx context.Context, siblingsGroup, dependenciesGroup program.Group) error {
63-
if len(os.Args) != 2 {
64-
return status.Error(codes.InvalidArgument, "Usage: bb_portal bb_portal.jsonnet")
66+
configurationPath, runCleanupWorkerOnly, err := parseArguments(os.Args)
67+
if err != nil {
68+
return err
6569
}
6670
var configuration bb_portal.ApplicationConfiguration
67-
if err := util.UnmarshalConfigurationFromFile(os.Args[1], &configuration); err != nil {
68-
return util.StatusWrapf(err, "Failed to read configuration from %s", os.Args[1])
71+
if err := util.UnmarshalConfigurationFromFile(configurationPath, &configuration); err != nil {
72+
return util.StatusWrapf(err, "Failed to read configuration from %s", configurationPath)
6973
}
7074

7175
prometheusmetrics.RegisterMetrics()
@@ -79,6 +83,16 @@ func main() {
7983
if tracerProvider == nil || reflect.ValueOf(tracerProvider).IsNil() {
8084
return status.Error(codes.Internal, "Otel tracer provider is nil")
8185
}
86+
87+
if runCleanupWorkerOnly {
88+
log.Println("Starting in cleanup worker only mode")
89+
if err := startBuildEventStreamCleanupWorker(ctx, &configuration, dependenciesGroup, tracerProvider); err != nil {
90+
return util.StatusWrap(err, "Failed to start BES cleanup worker")
91+
}
92+
lifecycleState.MarkReadyAndWait(siblingsGroup)
93+
return nil
94+
}
95+
8296
router := mux.NewRouter()
8397
router.Use(otelmux.Middleware("bb-portal-http", otelmux.WithTracerProvider(tracerProvider)))
8498

@@ -113,36 +127,56 @@ func main() {
113127
})
114128
}
115129

116-
func newBuildEventStreamService(
117-
configuration *bb_portal.ApplicationConfiguration,
118-
siblingsGroup program.Group,
119-
dependenciesGroup program.Group,
120-
grpcClientFactory bb_grpc.ClientFactory,
121-
router *mux.Router,
122-
tracerProvider trace.TracerProvider,
123-
) error {
124-
besConfiguration := configuration.BesServiceConfiguration
125-
if besConfiguration == nil {
126-
log.Printf("Did not start BuildEventStream service because buildEventStreamConfiguration is not configured")
127-
return nil
130+
func parseArguments(args []string) (string, bool, error) {
131+
flagSet := flag.NewFlagSet(args[0], flag.ContinueOnError)
132+
flagSet.SetOutput(io.Discard)
133+
runCleanupWorkerOnly := flagSet.Bool(cleanupWorkerFlag, false, "Run only the BES cleanup worker")
134+
if err := flagSet.Parse(args[1:]); err != nil {
135+
return "", false, status.Errorf(codes.InvalidArgument, "Failed to parse flags: %v", err)
136+
}
137+
if flagSet.NArg() != 1 {
138+
return "", false, status.Error(codes.InvalidArgument, "Usage: bb_portal [--cleanup-worker-only] bb_portal.jsonnet")
128139
}
140+
return flagSet.Arg(0), *runCleanupWorkerOnly, nil
141+
}
129142

143+
func newBuildEventStreamDatabaseClient(
144+
besConfiguration *bb_portal.BuildEventStreamService,
145+
tracerProvider trace.TracerProvider,
146+
) (database.Client, error) {
130147
dialect, connection, err := common.NewSQLConnectionFromConfiguration(besConfiguration.Database, tracerProvider)
131148
if err != nil {
132-
return util.StatusWrap(err, "Failed to connect to database for BuildEventStreamService")
149+
return nil, util.StatusWrap(err, "Failed to connect to database for BuildEventStreamService")
133150
}
134151

135152
dbClient, err := database.New(dialect, connection)
136153
if err != nil {
137-
return util.StatusWrap(err, "Failed to create database client from connection")
154+
return nil, util.StatusWrap(err, "Failed to create database client from connection")
138155
}
139156

140157
// Attempt to migrate towards ents model.
141158
if err = dbClient.Ent().Schema.Create(context.Background(), migrate.WithDropIndex(true)); err != nil {
142-
return util.StatusWrap(err, "Could not automatically migrate to desired schema")
159+
return nil, util.StatusWrap(err, "Could not automatically migrate to desired schema")
160+
}
161+
return dbClient, nil
162+
}
163+
164+
func startBuildEventStreamCleanupWorker(
165+
ctx context.Context,
166+
configuration *bb_portal.ApplicationConfiguration,
167+
dependenciesGroup program.Group,
168+
tracerProvider trace.TracerProvider,
169+
) error {
170+
besConfiguration := configuration.BesServiceConfiguration
171+
if besConfiguration == nil {
172+
return status.Error(codes.InvalidArgument, "BuildEventStreamService must be configured when running cleanup worker")
173+
}
174+
175+
dbClient, err := newBuildEventStreamDatabaseClient(besConfiguration, tracerProvider)
176+
if err != nil {
177+
return err
143178
}
144179

145-
// Configure the database cleanup service.
146180
cleanupConfiguration := besConfiguration.DatabaseCleanupConfiguration
147181
if cleanupConfiguration == nil {
148182
return status.Error(codes.InvalidArgument, "No databaseCleanupConfiguration configured for BuildEventStreamService")
@@ -158,7 +192,34 @@ func newBuildEventStreamService(
158192
return util.StatusWrap(err, "Failed to create DatabaseCleanupService")
159193
}
160194

161-
databaseCleanerService.StartDbCleanupService(context.Background(), dependenciesGroup)
195+
databaseCleanerService.StartDbCleanupService(ctx, dependenciesGroup)
196+
return nil
197+
}
198+
199+
func newBuildEventStreamService(
200+
configuration *bb_portal.ApplicationConfiguration,
201+
siblingsGroup program.Group,
202+
dependenciesGroup program.Group,
203+
grpcClientFactory bb_grpc.ClientFactory,
204+
router *mux.Router,
205+
tracerProvider trace.TracerProvider,
206+
) error {
207+
besConfiguration := configuration.BesServiceConfiguration
208+
if besConfiguration == nil {
209+
log.Printf("Did not start BuildEventStream service because buildEventStreamConfiguration is not configured")
210+
return nil
211+
}
212+
213+
dbClient, err := newBuildEventStreamDatabaseClient(besConfiguration, tracerProvider)
214+
if err != nil {
215+
return err
216+
}
217+
218+
// Configure the database cleanup service.
219+
cleanupConfiguration := besConfiguration.DatabaseCleanupConfiguration
220+
if cleanupConfiguration == nil {
221+
return status.Error(codes.InvalidArgument, "No databaseCleanupConfiguration configured for BuildEventStreamService")
222+
}
162223

163224
instanceNameAuthorizer, err := auth_configuration.DefaultAuthorizerFactory.NewAuthorizerFromConfiguration(configuration.InstanceNameAuthorizer, dependenciesGroup, grpcClientFactory)
164225
if err != nil {

internal/database/dbcleanupservice/compact_logs.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import (
1414
"go.opentelemetry.io/otel/codes"
1515
)
1616

17+
var compactLogsBatchSize int = 10
18+
1719
func (dc *DbCleanupService) normalizeInvocation(ctx context.Context, invocation *ent.BazelInvocation) (err error) {
1820
ctx, span := dc.tracer.Start(ctx, "DbCleanupService.normalizeInvocation")
1921
defer func() {
@@ -100,6 +102,8 @@ func (dc *DbCleanupService) CompactLogs(ctx context.Context) error {
100102
bazelinvocation.HasBuildLogChunks(),
101103
),
102104
).
105+
Order(ent.Asc(bazelinvocation.FieldID)).
106+
Limit(compactLogsBatchSize).
103107
All(ctx)
104108
if err != nil {
105109
span.RecordError(err)

internal/database/dbcleanupservice/dbcleanupservice_test.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,10 @@ func TestCompactLogs(t *testing.T) {
9494
ctx = dbauthservice.NewContextWithDbAuthServiceBypass(ctx)
9595
clock := mock.NewMockClock(ctrl)
9696
traceProvider := noop.NewTracerProvider()
97+
originalBatchSize := dbcleanupservice.CompactLogsBatchSizeForTest()
98+
t.Cleanup(func() {
99+
dbcleanupservice.SetCompactLogsBatchSizeForTest(originalBatchSize)
100+
})
97101

98102
t.Run("FinishedInvocationWithoutIncompleteLog", func(t *testing.T) {
99103
db := testutils.SetupTestDB(t, dbProvider)
@@ -195,6 +199,52 @@ func TestCompactLogs(t *testing.T) {
195199
require.NoError(t, err)
196200
requireIncompleteLogCount(t, client, 6)
197201
})
202+
203+
t.Run("ProcessesFixedBatchPerRun", func(t *testing.T) {
204+
dbcleanupservice.SetCompactLogsBatchSizeForTest(1)
205+
t.Cleanup(func() {
206+
dbcleanupservice.SetCompactLogsBatchSizeForTest(originalBatchSize)
207+
})
208+
db := testutils.SetupTestDB(t, dbProvider)
209+
client := db.Ent()
210+
instanceNameDbID := createInstanceName(t, ctx, client, "testInstance")
211+
firstInvocation, err := client.BazelInvocation.Create().
212+
SetInvocationID(uuid.New()).
213+
SetInstanceNameID(instanceNameDbID).
214+
SetBepCompleted(true).
215+
Save(ctx)
216+
require.NoError(t, err)
217+
secondInvocation, err := client.BazelInvocation.Create().
218+
SetInvocationID(uuid.New()).
219+
SetInstanceNameID(instanceNameDbID).
220+
SetBepCompleted(true).
221+
Save(ctx)
222+
require.NoError(t, err)
223+
224+
populateIncompleteBuildLog(t, ctx, client, firstInvocation.ID)
225+
populateIncompleteBuildLog(t, ctx, client, secondInvocation.ID)
226+
227+
cleanup, err := getNewDbCleanupService(db, clock, traceProvider)
228+
require.NoError(t, err)
229+
230+
err = cleanup.CompactLogs(ctx)
231+
require.NoError(t, err)
232+
233+
count, err := client.BazelInvocation.Query().
234+
Where(bazelinvocation.HasBuildLogChunks()).
235+
Count(ctx)
236+
require.NoError(t, err)
237+
require.Equal(t, 1, count)
238+
239+
err = cleanup.CompactLogs(ctx)
240+
require.NoError(t, err)
241+
242+
count, err = client.BazelInvocation.Query().
243+
Where(bazelinvocation.HasBuildLogChunks()).
244+
Count(ctx)
245+
require.NoError(t, err)
246+
require.Equal(t, 2, count)
247+
})
198248
}
199249

200250
func TestRemoveBuildsWithoutInvocations(t *testing.T) {
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package dbcleanupservice
2+
3+
// SetCompactLogsBatchSizeForTest overrides the compaction batch size for tests.
4+
func SetCompactLogsBatchSizeForTest(batchSize int) {
5+
compactLogsBatchSize = batchSize
6+
}
7+
8+
// CompactLogsBatchSizeForTest returns the current compaction batch size for tests.
9+
func CompactLogsBatchSizeForTest() int {
10+
return compactLogsBatchSize
11+
}

0 commit comments

Comments
 (0)