Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ bazel run //cmd/bb_portal -- $PWD/config/portal.jsonnet

The backend runs a reverse proxy for the frontend.

To run the cleanup loop in a dedicated worker process instead of the serving
process, start the same binary in cleanup-worker mode:

```
bazel run //cmd/bb_portal -- $PWD/config/portal.jsonnet --cleanup-worker-only
```

### Running the Frontend

From `./frontend`, run:
Expand Down
103 changes: 82 additions & 21 deletions cmd/bb_portal/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package main

import (
"context"
"flag"
"io"
"log"
"net/http"
"net/http/httputil"
Expand Down Expand Up @@ -56,16 +58,18 @@ import (
const (
readHeaderTimeout = 3 * time.Second
folderPermission = 0o750
cleanupWorkerFlag = "cleanup-worker-only"
)

func main() {
program.RunMain(func(ctx context.Context, siblingsGroup, dependenciesGroup program.Group) error {
if len(os.Args) != 2 {
return status.Error(codes.InvalidArgument, "Usage: bb_portal bb_portal.jsonnet")
configurationPath, runCleanupWorkerOnly, err := parseArguments(os.Args)
if err != nil {
return err
}
var configuration bb_portal.ApplicationConfiguration
if err := util.UnmarshalConfigurationFromFile(os.Args[1], &configuration); err != nil {
return util.StatusWrapf(err, "Failed to read configuration from %s", os.Args[1])
if err := util.UnmarshalConfigurationFromFile(configurationPath, &configuration); err != nil {
return util.StatusWrapf(err, "Failed to read configuration from %s", configurationPath)
}

prometheusmetrics.RegisterMetrics()
Expand All @@ -79,6 +83,16 @@ func main() {
if tracerProvider == nil || reflect.ValueOf(tracerProvider).IsNil() {
return status.Error(codes.Internal, "Otel tracer provider is nil")
}

if runCleanupWorkerOnly {
log.Println("Starting in cleanup worker only mode")
if err := startBuildEventStreamCleanupWorker(ctx, &configuration, dependenciesGroup, tracerProvider); err != nil {
return util.StatusWrap(err, "Failed to start BES cleanup worker")
}
lifecycleState.MarkReadyAndWait(siblingsGroup)
return nil
}

router := mux.NewRouter()
router.Use(otelmux.Middleware("bb-portal-http", otelmux.WithTracerProvider(tracerProvider)))

Expand Down Expand Up @@ -113,36 +127,56 @@ func main() {
})
}

func newBuildEventStreamService(
configuration *bb_portal.ApplicationConfiguration,
siblingsGroup program.Group,
dependenciesGroup program.Group,
grpcClientFactory bb_grpc.ClientFactory,
router *mux.Router,
tracerProvider trace.TracerProvider,
) error {
besConfiguration := configuration.BesServiceConfiguration
if besConfiguration == nil {
log.Printf("Did not start BuildEventStream service because buildEventStreamConfiguration is not configured")
return nil
func parseArguments(args []string) (string, bool, error) {
flagSet := flag.NewFlagSet(args[0], flag.ContinueOnError)
flagSet.SetOutput(io.Discard)
runCleanupWorkerOnly := flagSet.Bool(cleanupWorkerFlag, false, "Run only the BES cleanup worker")
if err := flagSet.Parse(args[1:]); err != nil {
return "", false, status.Errorf(codes.InvalidArgument, "Failed to parse flags: %v", err)
}
if flagSet.NArg() != 1 {
return "", false, status.Error(codes.InvalidArgument, "Usage: bb_portal [--cleanup-worker-only] bb_portal.jsonnet")
}
return flagSet.Arg(0), *runCleanupWorkerOnly, nil
}

func newBuildEventStreamDatabaseClient(
besConfiguration *bb_portal.BuildEventStreamService,
tracerProvider trace.TracerProvider,
) (database.Client, error) {
dialect, connection, err := common.NewSQLConnectionFromConfiguration(besConfiguration.Database, tracerProvider)
if err != nil {
return util.StatusWrap(err, "Failed to connect to database for BuildEventStreamService")
return nil, util.StatusWrap(err, "Failed to connect to database for BuildEventStreamService")
}

dbClient, err := database.New(dialect, connection)
if err != nil {
return util.StatusWrap(err, "Failed to create database client from connection")
return nil, util.StatusWrap(err, "Failed to create database client from connection")
}

// Attempt to migrate towards ents model.
if err = dbClient.Ent().Schema.Create(context.Background(), migrate.WithDropIndex(true)); err != nil {
return util.StatusWrap(err, "Could not automatically migrate to desired schema")
return nil, util.StatusWrap(err, "Could not automatically migrate to desired schema")
}
return dbClient, nil
}

func startBuildEventStreamCleanupWorker(
ctx context.Context,
configuration *bb_portal.ApplicationConfiguration,
dependenciesGroup program.Group,
tracerProvider trace.TracerProvider,
) error {
besConfiguration := configuration.BesServiceConfiguration
if besConfiguration == nil {
return status.Error(codes.InvalidArgument, "BuildEventStreamService must be configured when running cleanup worker")
}

dbClient, err := newBuildEventStreamDatabaseClient(besConfiguration, tracerProvider)
if err != nil {
return err
}

// Configure the database cleanup service.
cleanupConfiguration := besConfiguration.DatabaseCleanupConfiguration
if cleanupConfiguration == nil {
return status.Error(codes.InvalidArgument, "No databaseCleanupConfiguration configured for BuildEventStreamService")
Expand All @@ -158,7 +192,34 @@ func newBuildEventStreamService(
return util.StatusWrap(err, "Failed to create DatabaseCleanupService")
}

databaseCleanerService.StartDbCleanupService(context.Background(), dependenciesGroup)
databaseCleanerService.StartDbCleanupService(ctx, dependenciesGroup)
return nil
}

func newBuildEventStreamService(
configuration *bb_portal.ApplicationConfiguration,
siblingsGroup program.Group,
dependenciesGroup program.Group,
grpcClientFactory bb_grpc.ClientFactory,
router *mux.Router,
tracerProvider trace.TracerProvider,
) error {
besConfiguration := configuration.BesServiceConfiguration
if besConfiguration == nil {
log.Printf("Did not start BuildEventStream service because buildEventStreamConfiguration is not configured")
return nil
}

dbClient, err := newBuildEventStreamDatabaseClient(besConfiguration, tracerProvider)
if err != nil {
return err
}

// Configure the database cleanup service.
cleanupConfiguration := besConfiguration.DatabaseCleanupConfiguration
if cleanupConfiguration == nil {
return status.Error(codes.InvalidArgument, "No databaseCleanupConfiguration configured for BuildEventStreamService")
}

instanceNameAuthorizer, err := auth_configuration.DefaultAuthorizerFactory.NewAuthorizerFromConfiguration(configuration.InstanceNameAuthorizer, dependenciesGroup, grpcClientFactory)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions internal/database/dbcleanupservice/compact_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"go.opentelemetry.io/otel/codes"
)

var compactLogsBatchSize int = 10

func (dc *DbCleanupService) normalizeInvocation(ctx context.Context, invocation *ent.BazelInvocation) (err error) {
ctx, span := dc.tracer.Start(ctx, "DbCleanupService.normalizeInvocation")
defer func() {
Expand Down Expand Up @@ -100,6 +102,8 @@ func (dc *DbCleanupService) CompactLogs(ctx context.Context) error {
bazelinvocation.HasBuildLogChunks(),
),
).
Order(ent.Asc(bazelinvocation.FieldID)).
Limit(compactLogsBatchSize).
All(ctx)
if err != nil {
span.RecordError(err)
Expand Down
50 changes: 50 additions & 0 deletions internal/database/dbcleanupservice/dbcleanupservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ func TestCompactLogs(t *testing.T) {
ctx = dbauthservice.NewContextWithDbAuthServiceBypass(ctx)
clock := mock.NewMockClock(ctrl)
traceProvider := noop.NewTracerProvider()
originalBatchSize := dbcleanupservice.CompactLogsBatchSizeForTest()
t.Cleanup(func() {
dbcleanupservice.SetCompactLogsBatchSizeForTest(originalBatchSize)
})

t.Run("FinishedInvocationWithoutIncompleteLog", func(t *testing.T) {
db := testutils.SetupTestDB(t, dbProvider)
Expand Down Expand Up @@ -195,6 +199,52 @@ func TestCompactLogs(t *testing.T) {
require.NoError(t, err)
requireIncompleteLogCount(t, client, 6)
})

t.Run("ProcessesFixedBatchPerRun", func(t *testing.T) {
dbcleanupservice.SetCompactLogsBatchSizeForTest(1)
t.Cleanup(func() {
dbcleanupservice.SetCompactLogsBatchSizeForTest(originalBatchSize)
})
db := testutils.SetupTestDB(t, dbProvider)
client := db.Ent()
instanceNameDbID := createInstanceName(t, ctx, client, "testInstance")
firstInvocation, err := client.BazelInvocation.Create().
SetInvocationID(uuid.New()).
SetInstanceNameID(instanceNameDbID).
SetBepCompleted(true).
Save(ctx)
require.NoError(t, err)
secondInvocation, err := client.BazelInvocation.Create().
SetInvocationID(uuid.New()).
SetInstanceNameID(instanceNameDbID).
SetBepCompleted(true).
Save(ctx)
require.NoError(t, err)

populateIncompleteBuildLog(t, ctx, client, firstInvocation.ID)
populateIncompleteBuildLog(t, ctx, client, secondInvocation.ID)

cleanup, err := getNewDbCleanupService(db, clock, traceProvider)
require.NoError(t, err)

err = cleanup.CompactLogs(ctx)
require.NoError(t, err)

count, err := client.BazelInvocation.Query().
Where(bazelinvocation.HasBuildLogChunks()).
Count(ctx)
require.NoError(t, err)
require.Equal(t, 1, count)

err = cleanup.CompactLogs(ctx)
require.NoError(t, err)

count, err = client.BazelInvocation.Query().
Where(bazelinvocation.HasBuildLogChunks()).
Count(ctx)
require.NoError(t, err)
require.Equal(t, 2, count)
})
}

func TestRemoveBuildsWithoutInvocations(t *testing.T) {
Expand Down
11 changes: 11 additions & 0 deletions internal/database/dbcleanupservice/export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package dbcleanupservice

// SetCompactLogsBatchSizeForTest overrides the compaction batch size for tests.
func SetCompactLogsBatchSizeForTest(batchSize int) {
compactLogsBatchSize = batchSize
}

// CompactLogsBatchSizeForTest returns the current compaction batch size for tests.
func CompactLogsBatchSizeForTest() int {
return compactLogsBatchSize
}