From 538891eea1a39277b2ebaabd9536f2e59bfc3d06 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Wed, 19 Nov 2025 11:38:54 -0500 Subject: [PATCH 1/7] CBG-4932 improve ResyncManger test flakes - the problem seems to be that some tests take a long time to run, mostly after churn of the bucket pool to do query operations after resync finishes - clean up to use test functions for waiting --- ...ackground_mgr_attachment_migration_test.go | 10 +- db/background_mgr_resync_dcp.go | 5 +- db/background_mgr_resync_dcp_test.go | 209 +++++------------- db/database.go | 8 +- db/database_collection.go | 5 - db/database_test.go | 2 +- db/util_testing.go | 17 +- .../attachment_migration_test.go | 2 +- 8 files changed, 81 insertions(+), 177 deletions(-) diff --git a/db/background_mgr_attachment_migration_test.go b/db/background_mgr_attachment_migration_test.go index 7d583e06b7..8914ad3864 100644 --- a/db/background_mgr_attachment_migration_test.go +++ b/db/background_mgr_attachment_migration_test.go @@ -55,7 +55,7 @@ func TestAttachmentMigrationTaskMixMigratedAndNonMigratedDocs(t *testing.T) { require.NoError(t, err) // wait for task to complete - RequireBackgroundManagerState(t, ctx, attachMigrationMgr, BackgroundProcessStateCompleted) + RequireBackgroundManagerState(t, attachMigrationMgr, BackgroundProcessStateCompleted) // assert that the subset (5) of the docs were changed, all created docs were processed (10) stats := getAttachmentMigrationStats(t, attachMigrationMgr.Process) @@ -118,7 +118,7 @@ func TestAttachmentMigrationManagerResumeStoppedMigration(t *testing.T) { } }() - RequireBackgroundManagerState(t, ctx, attachMigrationMgr, BackgroundProcessStateStopped) + RequireBackgroundManagerState(t, attachMigrationMgr, BackgroundProcessStateStopped) stats := getAttachmentMigrationStats(t, attachMigrationMgr.Process) require.Less(t, stats.DocsProcessed, int64(4000)) @@ -132,7 +132,7 @@ func TestAttachmentMigrationManagerResumeStoppedMigration(t *testing.T) { err = attachMigrationMgr.Start(ctx, nil) require.NoError(t, err) - RequireBackgroundManagerState(t, ctx, attachMigrationMgr, BackgroundProcessStateCompleted) + RequireBackgroundManagerState(t, attachMigrationMgr, BackgroundProcessStateCompleted) stats = getAttachmentMigrationStats(t, attachMigrationMgr.Process) require.GreaterOrEqual(t, stats.DocsProcessed, int64(4000)) @@ -169,7 +169,7 @@ func TestAttachmentMigrationManagerNoDocsToMigrate(t *testing.T) { require.NoError(t, err) // wait for task to complete - RequireBackgroundManagerState(t, ctx, attachMigrationMgr, BackgroundProcessStateCompleted) + RequireBackgroundManagerState(t, attachMigrationMgr, BackgroundProcessStateCompleted) // assert that the two added docs above were processed but not changed stats := getAttachmentMigrationStats(t, attachMigrationMgr.Process) @@ -227,7 +227,7 @@ func TestMigrationManagerDocWithSyncAndGlobalAttachmentMetadata(t *testing.T) { require.NoError(t, err) // wait for task to complete - RequireBackgroundManagerState(t, ctx, attachMigrationMgr, BackgroundProcessStateCompleted) + RequireBackgroundManagerState(t, attachMigrationMgr, BackgroundProcessStateCompleted) // assert that the two added docs above were processed but not changed stats := getAttachmentMigrationStats(t, attachMigrationMgr.Process) diff --git a/db/background_mgr_resync_dcp.go b/db/background_mgr_resync_dcp.go index 9f1ed723cd..bf058b62ea 100644 --- a/db/background_mgr_resync_dcp.go +++ b/db/background_mgr_resync_dcp.go @@ -234,7 +234,10 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, pers for _, databaseCollection := range db.CollectionByID { collectionNames = append(collectionNames, databaseCollection.ScopeAndCollectionName()) } - db.invalidateAllPrincipals(ctx, collectionNames, endSeq) + err = db.invalidateAllPrincipals(ctx, collectionNames, endSeq) + if err != nil { + return fmt.Errorf("Could not invalid principal documents: %w", err) + } } diff --git a/db/background_mgr_resync_dcp_test.go b/db/background_mgr_resync_dcp_test.go index 0989d414c3..ba1bd294de 100644 --- a/db/background_mgr_resync_dcp_test.go +++ b/db/background_mgr_resync_dcp_test.go @@ -119,15 +119,11 @@ func TestResyncDCPInit(t *testing.T) { db, ctx := setupTestDB(t) defer db.Close(ctx) - resyncMgr := NewResyncManagerDCP(db.MetadataStore, base.TestUseXattrs(), db.MetadataKeys) - require.NotNil(t, resyncMgr) - db.ResyncManager = resyncMgr - defer func() { - _ = resyncMgr.Stop() + _ = db.ResyncManager.Stop() // this gets called by background manager in each Start call. // We have to manually call this for tests only to reset docsChanged/docsProcessed counters - resyncMgr.resetStatus() + db.ResyncManager.resetStatus() }() options := make(map[string]any) @@ -153,10 +149,10 @@ func TestResyncDCPInit(t *testing.T) { require.NoError(t, err) } - err = resyncMgr.Process.Init(ctx, options, clusterData) + err = db.ResyncManager.Process.Init(ctx, options, clusterData) require.NoError(t, err) - response := getResyncStats(resyncMgr.Process) + response := getResyncStats(t, db) assert.NotEmpty(t, response.ResyncID) if testCase.shouldCreateNewRun { @@ -181,44 +177,23 @@ func TestResyncManagerDCPStopInMidWay(t *testing.T) { db, ctx := setupTestDBForResyncWithDocs(t, docsToCreate, true) defer db.Close(ctx) - resyncMgr := NewResyncManagerDCP(db.MetadataStore, base.TestUseXattrs(), db.MetadataKeys) - - require.NotNil(t, resyncMgr) - db.ResyncManager = resyncMgr - options := map[string]any{ "database": db, "regenerateSequences": false, "collections": ResyncCollections{}, } - err := resyncMgr.Start(ctx, options) + err := db.ResyncManager.Start(ctx, options) require.NoError(t, err) wg := sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() - err = WaitForConditionWithOptions(t, func() bool { - stats := getResyncStats(resyncMgr.Process) - if stats.DocsProcessed > 300 { - err = resyncMgr.Stop() - require.NoError(t, err) - return true - } - return false - }, 2000, 10) - require.NoError(t, err) + waitForResyncDocsProcessed(t, db, 300) + require.NoError(t, db.ResyncManager.Stop()) }() - err = WaitForConditionWithOptions(t, func() bool { - var status BackgroundManagerStatus - rawStatus, _ := resyncMgr.GetStatus(ctx) - _ = json.Unmarshal(rawStatus, &status) - return status.State == BackgroundProcessStateStopped - }, 2000, 10) - require.NoError(t, err) - - stats := getResyncStats(resyncMgr.Process) + stats := waitForResyncState(t, db, BackgroundProcessStateStopped) assert.Less(t, stats.DocsProcessed, int64(docsToCreate), "DocsProcessed is equal to docs created. Consider setting docsToCreate > %d.", docsToCreate) assert.Less(t, stats.DocsChanged, int64(docsToCreate)) @@ -229,7 +204,6 @@ func TestResyncManagerDCPStopInMidWay(t *testing.T) { func TestResyncManagerDCPStart(t *testing.T) { - base.SetUpTestLogging(t, base.LevelDebug, base.KeyAll) if base.UnitTestUrlIsWalrus() { t.Skip("Test requires Couchbase Server") } @@ -244,28 +218,14 @@ func TestResyncManagerDCPStart(t *testing.T) { scopeName := scopeAndCollectionName.ScopeName() collectionName := scopeAndCollectionName.CollectionName() - resyncMgr := NewResyncManagerDCP(db.MetadataStore, base.TestUseXattrs(), db.MetadataKeys) - - require.NotNil(t, resyncMgr) - db.ResyncManager = resyncMgr - options := map[string]any{ "database": db, "regenerateSequences": false, "collections": ResyncCollections{}, } - err := resyncMgr.Start(ctx, options) - require.NoError(t, err) + require.NoError(t, db.ResyncManager.Start(ctx, options)) + stats := waitForResyncState(t, db, BackgroundProcessStateCompleted) - err = WaitForConditionWithOptions(t, func() bool { - var status BackgroundManagerStatus - rawStatus, _ := resyncMgr.GetStatus(ctx) - _ = json.Unmarshal(rawStatus, &status) - return status.State == BackgroundProcessStateCompleted - }, 2000, 10) - require.NoError(t, err) - - stats := getResyncStats(resyncMgr.Process) assert.GreaterOrEqual(t, stats.DocsProcessed, int64(docsToCreate)) // may be processing tombstones from previous tests assert.Equal(t, int64(0), stats.DocsChanged) @@ -290,10 +250,7 @@ func TestResyncManagerDCPStart(t *testing.T) { scopeName := scopeAndCollectionName.ScopeName() collectionName := scopeAndCollectionName.CollectionName() - resyncMgr := NewResyncManagerDCP(db.MetadataStore, base.TestUseXattrs(), db.MetadataKeys) - require.NotNil(t, resyncMgr) - - initialStats := getResyncStats(resyncMgr.Process) + initialStats := getResyncStats(t, db) log.Printf("initialStats: processed[%v] changed[%v]", initialStats.DocsProcessed, initialStats.DocsChanged) options := map[string]any{ @@ -302,18 +259,12 @@ func TestResyncManagerDCPStart(t *testing.T) { "collections": ResyncCollections{}, } - err := resyncMgr.Start(ctx, options) + err := db.ResyncManager.Start(ctx, options) require.NoError(t, err) - err = WaitForConditionWithOptions(t, func() bool { - var status BackgroundManagerStatus - rawStatus, _ := resyncMgr.GetStatus(ctx) - _ = json.Unmarshal(rawStatus, &status) - return status.State == BackgroundProcessStateCompleted - }, 2000, 10) - require.NoError(t, err) + RequireBackgroundManagerState(t, db.ResyncManager, BackgroundProcessStateCompleted) - stats := getResyncStats(resyncMgr.Process) + stats := getResyncStats(t, db) // If there are tombstones from older docs which have been deleted from the bucket, processed docs will // be greater than DocsChanged assert.GreaterOrEqual(t, stats.DocsProcessed, int64(docsToCreate)) @@ -341,17 +292,13 @@ func TestResyncManagerDCPRunTwice(t *testing.T) { db, ctx := setupTestDBForResyncWithDocs(t, docsToCreate, false) defer db.Close(ctx) - resyncMgr := NewResyncManagerDCP(db.MetadataStore, base.TestUseXattrs(), db.MetadataKeys) - require.NotNil(t, resyncMgr) - db.ResyncManager = resyncMgr - options := map[string]any{ "database": db, "regenerateSequences": false, "collections": ResyncCollections{}, } - err := resyncMgr.Start(ctx, options) + err := db.ResyncManager.Start(ctx, options) require.NoError(t, err) wg := sync.WaitGroup{} @@ -359,26 +306,14 @@ func TestResyncManagerDCPRunTwice(t *testing.T) { // Attempt to Start running process go func() { defer wg.Done() - err := WaitForConditionWithOptions(t, func() bool { - stats := getResyncStats(resyncMgr.Process) - return stats.DocsProcessed > 100 - }, 100, 10) - require.NoError(t, err) + waitForResyncDocsProcessed(t, db, 100) - err = resyncMgr.Start(ctx, options) + err = db.ResyncManager.Start(ctx, options) require.Error(t, err) assert.Contains(t, err.Error(), "Process already running") }() - err = WaitForConditionWithOptions(t, func() bool { - var status BackgroundManagerStatus - rawStatus, _ := resyncMgr.GetStatus(ctx) - _ = json.Unmarshal(rawStatus, &status) - return status.State == BackgroundProcessStateCompleted - }, 2000, 10) - require.NoError(t, err) - - stats := getResyncStats(resyncMgr.Process) + stats := waitForResyncState(t, db, BackgroundProcessStateCompleted) // If there are tombstones from a previous test which have been deleted from the bucket, processed docs will // be greater than DocsChanged @@ -398,17 +333,13 @@ func TestResyncManagerDCPResumeStoppedProcess(t *testing.T) { db, ctx := setupTestDBForResyncWithDocs(t, docsToCreate, true) defer db.Close(ctx) - resyncMgr := NewResyncManagerDCP(db.MetadataStore, base.TestUseXattrs(), db.MetadataKeys) - require.NotNil(t, resyncMgr) - db.ResyncManager = resyncMgr - options := map[string]any{ "database": db, "regenerateSequences": false, "collections": ResyncCollections{}, } - err := resyncMgr.Start(ctx, options) + err := db.ResyncManager.Start(ctx, options) require.NoError(t, err) // Attempt to Stop Process @@ -416,42 +347,21 @@ func TestResyncManagerDCPResumeStoppedProcess(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - for { - stats := getResyncStats(resyncMgr.Process) - if stats.DocsProcessed >= 2000 { - err = resyncMgr.Stop() - require.NoError(t, err) - break - } - time.Sleep(1 * time.Microsecond) - } + waitForResyncDocsProcessed(t, db, 2000) + require.NoError(t, db.ResyncManager.Stop()) }() - err = WaitForConditionWithOptions(t, func() bool { - var status BackgroundManagerStatus - rawStatus, _ := resyncMgr.GetStatus(ctx) - _ = json.Unmarshal(rawStatus, &status) - return status.State == BackgroundProcessStateStopped - }, 2000, 10) - require.NoError(t, err) + stats := waitForResyncState(t, db, BackgroundProcessStateStopped) - stats := getResyncStats(resyncMgr.Process) require.Less(t, stats.DocsProcessed, int64(docsToCreate), "DocsProcessed is equal to docs created. Consider setting docsToCreate > %d.", docsToCreate) assert.Less(t, stats.DocsChanged, int64(docsToCreate)) // Resume process - err = resyncMgr.Start(ctx, options) + err = db.ResyncManager.Start(ctx, options) require.NoError(t, err) - err = WaitForConditionWithOptions(t, func() bool { - var status BackgroundManagerStatus - rawStatus, _ := resyncMgr.GetStatus(ctx) - _ = json.Unmarshal(rawStatus, &status) - return status.State == BackgroundProcessStateCompleted - }, 2000, 10) - require.NoError(t, err) + stats = waitForResyncState(t, db, BackgroundProcessStateCompleted) - stats = getResyncStats(resyncMgr.Process) assert.GreaterOrEqual(t, stats.DocsProcessed, int64(docsToCreate)) assert.Equal(t, int64(docsToCreate), stats.DocsChanged) @@ -481,10 +391,6 @@ func TestResyncManagerDCPResumeStoppedProcessChangeCollections(t *testing.T) { db, ctx := SetupTestDBForBucketWithOptions(t, tb, dbOptions) defer db.Close(ctx) - resyncMgr := NewResyncManagerDCP(db.MetadataStore, base.TestUseXattrs(), db.MetadataKeys) - require.NotNil(t, resyncMgr) - db.ResyncManager = resyncMgr - dbCollections := make([]*DatabaseCollectionWithUser, numCollections) for i, scName := range db.DataStoreNames() { col, err := db.GetDatabaseCollectionWithUser(scName.ScopeName(), scName.CollectionName()) @@ -520,7 +426,7 @@ func TestResyncManagerDCPResumeStoppedProcessChangeCollections(t *testing.T) { }, } - err := resyncMgr.Start(ctx, options) + err := db.ResyncManager.Start(ctx, options) require.NoError(t, err) // Attempt to Stop Process @@ -529,9 +435,9 @@ func TestResyncManagerDCPResumeStoppedProcessChangeCollections(t *testing.T) { go func() { defer wg.Done() for { - stats := getResyncStats(resyncMgr.Process) + stats := getResyncStats(t, db) if stats.DocsProcessed >= 2000 { - err = resyncMgr.Stop() + err = db.ResyncManager.Stop() require.NoError(t, err) break } @@ -539,15 +445,8 @@ func TestResyncManagerDCPResumeStoppedProcessChangeCollections(t *testing.T) { } }() - err = WaitForConditionWithOptions(t, func() bool { - var status BackgroundManagerStatus - rawStatus, _ := resyncMgr.GetStatus(ctx) - _ = json.Unmarshal(rawStatus, &status) - return status.State == BackgroundProcessStateStopped - }, 2000, 10) - require.NoError(t, err) + stats := waitForResyncState(t, db, BackgroundProcessStateStopped) - stats := getResyncStats(resyncMgr.Process) require.Less(t, stats.DocsProcessed, int64(docsPerCollection), "DocsProcessed is equal to docs created. Consider setting docsPerCollection > %d.", docsPerCollection) assert.Less(t, stats.DocsChanged, int64(docsPerCollection)) @@ -562,19 +461,11 @@ func TestResyncManagerDCPResumeStoppedProcessChangeCollections(t *testing.T) { } // Resume process - err = resyncMgr.Start(ctx, options) + err = db.ResyncManager.Start(ctx, options) require.NoError(t, err) - err = WaitForConditionWithOptions(t, func() bool { - var status BackgroundManagerStatus - rawStatus, _ := resyncMgr.GetStatus(ctx) - _ = json.Unmarshal(rawStatus, &status) - t.Logf("Resync status: %s", rawStatus) - return status.State == BackgroundProcessStateCompleted - }, 2000, 10) - require.NoError(t, err) + stats = waitForResyncState(t, db, BackgroundProcessStateCompleted) - stats = getResyncStats(resyncMgr.Process) assert.GreaterOrEqual(t, stats.DocsProcessed, int64(totalDocCount)) assert.Equal(t, int64(totalDocCount), stats.DocsChanged+firstDocsChanged) @@ -709,10 +600,7 @@ func runResync(t *testing.T, ctx context.Context, db *Database, collection *Data _, err := collection.UpdateSyncFun(ctx, syncFn) require.NoError(t, err) - resyncMgr := NewResyncManagerDCP(db.MetadataStore, base.TestUseXattrs(), db.MetadataKeys) - require.NotNil(t, resyncMgr) - - initialStats := getResyncStats(resyncMgr.Process) + initialStats := getResyncStats(t, db) log.Printf("initialStats: processed[%v] changed[%v]", initialStats.DocsProcessed, initialStats.DocsChanged) options := map[string]any{ @@ -721,24 +609,29 @@ func runResync(t *testing.T, ctx context.Context, db *Database, collection *Data "collections": ResyncCollections{}, } - err = resyncMgr.Start(ctx, options) - require.NoError(t, err) - - require.EventuallyWithT(t, func(c *assert.CollectT) { - var status BackgroundManagerStatus - rawStatus, err := resyncMgr.GetStatus(ctx) - assert.NoError(c, err) - assert.NoError(c, json.Unmarshal(rawStatus, &status)) - assert.Equal(c, BackgroundProcessStateCompleted, status.State) - }, 40*time.Second, 200*time.Millisecond) - - return getResyncStats(resyncMgr.Process) + require.NoError(t, db.ResyncManager.Start(ctx, options)) + return waitForResyncState(t, db, BackgroundProcessStateCompleted) } // helper function to Unmarshal BackgroundProcess state into ResyncManagerResponseDCP -func getResyncStats(resyncManager BackgroundManagerProcessI) ResyncManagerResponseDCP { +func getResyncStats(t testing.TB, db *Database) ResyncManagerResponseDCP { var resp ResyncManagerResponseDCP - rawStatus, _, _ := resyncManager.GetProcessStatus(BackgroundManagerStatus{}) - _ = json.Unmarshal(rawStatus, &resp) + rawStatus, _, err := db.ResyncManager.Process.GetProcessStatus(BackgroundManagerStatus{}) + require.NoError(t, err) + require.NoError(t, json.Unmarshal(rawStatus, &resp)) return resp } + +// waitForResyncState waits for the resync manager to reach the desired state, and then returns the status. +func waitForResyncState(t testing.TB, db *Database, desiredState BackgroundProcessState) ResyncManagerResponseDCP { + RequireBackgroundManagerState(t, db.ResyncManager, desiredState) + return getResyncStats(t, db) +} + +// waitForResyncDocsProcessed waits until the resync manager has processed more than the specified count of documents. +func waitForResyncDocsProcessed(t testing.TB, db *Database, count int64) { + require.EventuallyWithT(t, func(c *assert.CollectT) { + stats := getResyncStats(t, db) + assert.Greater(c, stats.DocsProcessed, count) + }, 10*time.Second, 1*time.Millisecond) +} diff --git a/db/database.go b/db/database.go index be32d2aff3..d0a166e97d 100644 --- a/db/database.go +++ b/db/database.go @@ -1916,15 +1916,19 @@ func (db *DatabaseCollectionWithUser) resyncDocument(ctx context.Context, docid, } // invalidateAllPrincipals invalidates computed channels and roles for all users/roles, for the specified collections: -func (dbCtx *DatabaseContext) invalidateAllPrincipals(ctx context.Context, collectionNames base.ScopeAndCollectionNames, endSeq uint64) { +func (dbCtx *DatabaseContext) invalidateAllPrincipals(ctx context.Context, collectionNames base.ScopeAndCollectionNames, endSeq uint64) error { base.InfofCtx(ctx, base.KeyAll, "Invalidating channel caches of users/roles...") - users, roles, _ := dbCtx.AllPrincipalIDs(ctx) + users, roles, err := dbCtx.AllPrincipalIDs(ctx) + if err != nil { + return err + } for _, name := range users { dbCtx.invalUserRolesAndChannels(ctx, name, collectionNames, endSeq) } for _, name := range roles { dbCtx.invalRoleChannels(ctx, name, collectionNames, endSeq) } + return nil } // invalUserChannels invalidates a user's computed channels for the specified collections diff --git a/db/database_collection.go b/db/database_collection.go index daa77de851..611924563c 100644 --- a/db/database_collection.go +++ b/db/database_collection.go @@ -428,11 +428,6 @@ func (c *DatabaseCollection) invalRoleChannels(ctx context.Context, rolename str c.dbCtx.invalRoleChannels(ctx, rolename, base.ScopeAndCollectionNames{c.ScopeAndCollectionName()}, invalSeq) } -// invalidateAllPrincipals invalidates computed channels and roles for collection c, for all users and roles -func (c *DatabaseCollection) invalidateAllPrincipals(ctx context.Context, endSeq uint64) { - c.dbCtx.invalidateAllPrincipals(ctx, base.ScopeAndCollectionNames{c.ScopeAndCollectionName()}, endSeq) -} - func (c *DatabaseCollection) useMou() bool { return c.dbCtx.UseMou() } diff --git a/db/database_test.go b/db/database_test.go index cc10be8878..6ec455298e 100644 --- a/db/database_test.go +++ b/db/database_test.go @@ -3590,7 +3590,7 @@ func Test_invalidateAllPrincipalsCache(t *testing.T) { assert.NoError(t, err) assert.Greater(t, endSeq, uint64(0)) - collection.invalidateAllPrincipals(ctx, endSeq) + require.NoError(t, db.invalidateAllPrincipals(ctx, base.ScopeAndCollectionNames{sgbucket.DataStoreNameImpl{Scope: collection.ScopeName, Collection: collection.Name}}, endSeq)) err = collection.WaitForPendingChanges(ctx) assert.NoError(t, err) diff --git a/db/util_testing.go b/db/util_testing.go index 3926ee25e6..bc951dd4df 100644 --- a/db/util_testing.go +++ b/db/util_testing.go @@ -935,14 +935,23 @@ func MoveAttachmentXattrFromGlobalToSync(t *testing.T, dataStore base.DataStore, // RequireBackgroundManagerState waits for a BackgroundManager to reach a given state within 10 seconds or fails test // harness. -func RequireBackgroundManagerState(t testing.TB, ctx context.Context, mgr *BackgroundManager, expState BackgroundProcessState) { +func RequireBackgroundManagerState(t testing.TB, mgr *BackgroundManager, expState BackgroundProcessState) BackgroundManagerStatus { + waitTime := 10 * time.Second + if !base.UnitTestUrlIsWalrus() { + // Increase wait time for CI tests against Couchbase Server with GSI disabled (views), some queries take a + // longer time to run + waitTime = 30 * time.Second + } + ctx := base.TestCtx(t) + var status *BackgroundManagerStatus require.EventuallyWithT(t, func(c *assert.CollectT) { - var status BackgroundManagerStatus + status = nil rawStatus, err := mgr.GetStatus(ctx) assert.NoError(c, err) assert.NoError(c, base.JSONUnmarshal(rawStatus, &status)) - assert.Equal(c, expState, status.State) - }, time.Second*10, time.Millisecond*100) + assert.Equal(c, expState, status.State, "BackgroundManager did not reach expected state in %d seconds. Current status: %s", int(waitTime.Seconds()), string(rawStatus)) + }, waitTime, time.Millisecond*100) + return *status } // AssertSyncInfoMetaVersion will assert that meta version is equal to current product version diff --git a/rest/attachmentmigrationtest/attachment_migration_test.go b/rest/attachmentmigrationtest/attachment_migration_test.go index f623c36d94..54f1041477 100644 --- a/rest/attachmentmigrationtest/attachment_migration_test.go +++ b/rest/attachmentmigrationtest/attachment_migration_test.go @@ -385,6 +385,6 @@ func getAttachmentMigrationManagerStatus(rt *rest.RestTester) db.AttachmentMigra // waitForAttachmentMigrationState waits for the AttachmentMigrationManager to reach the expected state and then returns // its status. func waitForAttachmentMigrationState(rt *rest.RestTester, expectedState db.BackgroundProcessState) db.AttachmentMigrationManagerResponse { - db.RequireBackgroundManagerState(rt.TB(), rt.Context(), rt.GetDatabase().AttachmentMigrationManager, expectedState) + db.RequireBackgroundManagerState(rt.TB(), rt.GetDatabase().AttachmentMigrationManager, expectedState) return getAttachmentMigrationManagerStatus(rt) } From df23f8499a28dbe9b2407d0c144e2e773db5d343 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Thu, 20 Nov 2025 09:41:19 -0500 Subject: [PATCH 2/7] Update db/background_mgr_resync_dcp.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- db/background_mgr_resync_dcp.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/background_mgr_resync_dcp.go b/db/background_mgr_resync_dcp.go index bf058b62ea..643182f553 100644 --- a/db/background_mgr_resync_dcp.go +++ b/db/background_mgr_resync_dcp.go @@ -236,7 +236,7 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, pers } err = db.invalidateAllPrincipals(ctx, collectionNames, endSeq) if err != nil { - return fmt.Errorf("Could not invalid principal documents: %w", err) + return fmt.Errorf("Could not invalidate principal documents: %w", err) } } From 60b535d72719c11f1e1d2b987b166cabbae11eea Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Thu, 20 Nov 2025 10:11:02 -0500 Subject: [PATCH 3/7] Update comment --- db/util_testing.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/db/util_testing.go b/db/util_testing.go index bc951dd4df..ef84360e1e 100644 --- a/db/util_testing.go +++ b/db/util_testing.go @@ -938,8 +938,8 @@ func MoveAttachmentXattrFromGlobalToSync(t *testing.T, dataStore base.DataStore, func RequireBackgroundManagerState(t testing.TB, mgr *BackgroundManager, expState BackgroundProcessState) BackgroundManagerStatus { waitTime := 10 * time.Second if !base.UnitTestUrlIsWalrus() { - // Increase wait time for CI tests against Couchbase Server with GSI disabled (views), some queries take a - // longer time to run + // Increase wait time for CI tests against Couchbase Server, they can take longer to run. + // Generally everything runs in 10 seconds, but when it does not, it is not worth flagging the failures. waitTime = 30 * time.Second } ctx := base.TestCtx(t) From 6c4ad6a6229d55720d3d7d239c5202336d4d3822 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Thu, 20 Nov 2025 10:11:57 -0500 Subject: [PATCH 4/7] Update comment again --- db/util_testing.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/db/util_testing.go b/db/util_testing.go index ef84360e1e..a8645429b6 100644 --- a/db/util_testing.go +++ b/db/util_testing.go @@ -933,8 +933,7 @@ func MoveAttachmentXattrFromGlobalToSync(t *testing.T, dataStore base.DataStore, require.NoError(t, err) } -// RequireBackgroundManagerState waits for a BackgroundManager to reach a given state within 10 seconds or fails test -// harness. +// RequireBackgroundManagerState waits for a BackgroundManager to reach a given state or fails test harness. func RequireBackgroundManagerState(t testing.TB, mgr *BackgroundManager, expState BackgroundProcessState) BackgroundManagerStatus { waitTime := 10 * time.Second if !base.UnitTestUrlIsWalrus() { From 645deec14e3b652a3f6083d9b312211e10ac7551 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Mon, 24 Nov 2025 09:08:43 -0500 Subject: [PATCH 5/7] Wait for heartbeat removal, since you can not restart a process until heartbeat is removed --- db/background_mgr.go | 21 --------- db/util_testing.go | 21 ++++++++- .../collections_admin_api_test.go | 7 +-- .../attachment_compaction_api_test.go | 46 +------------------ rest/utilities_testing.go | 21 --------- rest/utilities_testing_attachment.go | 5 ++ rest/utilities_testing_resttester.go | 7 +-- 7 files changed, 29 insertions(+), 99 deletions(-) diff --git a/db/background_mgr.go b/db/background_mgr.go index e6994eb732..e574d4373c 100644 --- a/db/background_mgr.go +++ b/db/background_mgr.go @@ -10,10 +10,8 @@ package db import ( "context" - "fmt" "net/http" "sync" - "testing" "time" sgbucket "github.com/couchbase/sg-bucket" @@ -408,25 +406,6 @@ func (b *BackgroundManager) GetRunState() BackgroundProcessState { return b.State } -// For test use only -// Returns empty string if background process is not cluster aware -func (b *BackgroundManager) GetHeartbeatDocID(t testing.TB) string { - if b.isClusterAware() { - return b.clusterAwareOptions.HeartbeatDocID() - } - return "" -} - -// For test use only -// Returns error if background process is not cluster aware -func (b *BackgroundManager) GetHeartbeatDoc(t testing.TB) ([]byte, error) { - if b.isClusterAware() { - b, _, err := b.clusterAwareOptions.metadataStore.GetRaw(b.GetHeartbeatDocID(t)) - return b, err - } - return nil, fmt.Errorf("background process is not cluster aware") -} - func (b *BackgroundManager) SetError(err error) { b.lock.Lock() defer b.lock.Unlock() diff --git a/db/util_testing.go b/db/util_testing.go index a8645429b6..cea5e183b3 100644 --- a/db/util_testing.go +++ b/db/util_testing.go @@ -933,6 +933,23 @@ func MoveAttachmentXattrFromGlobalToSync(t *testing.T, dataStore base.DataStore, require.NoError(t, err) } +// WaitForBackgroundManagerHeartbeatDocRemoval waits for removal of heartbeat document or fails the test harness. +// +// After a background manager state transition to completed, stopped, error is followed by immediate removal of the +// heartbeat document. When restarting a background manager, the state of the heartbeat document is checked, allowing +// for a small race if you try to stop and immediately restart a background manager. +func WaitForBackgroundManagerHeartbeatDocRemoval(t testing.TB, mgr *BackgroundManager) { + if !mgr.isClusterAware() { + return + } + + require.EventuallyWithT(t, func(c *assert.CollectT) { + exists, ok := mgr.clusterAwareOptions.metadataStore.Exists(mgr.clusterAwareOptions.HeartbeatDocID()) + require.NoError(t, ok) + assert.False(c, exists, "BackgroundManager heartbeat document was not removed in expected time") + }, 10*time.Second, 10*time.Millisecond) +} + // RequireBackgroundManagerState waits for a BackgroundManager to reach a given state or fails test harness. func RequireBackgroundManagerState(t testing.TB, mgr *BackgroundManager, expState BackgroundProcessState) BackgroundManagerStatus { waitTime := 10 * time.Second @@ -949,7 +966,9 @@ func RequireBackgroundManagerState(t testing.TB, mgr *BackgroundManager, expStat assert.NoError(c, err) assert.NoError(c, base.JSONUnmarshal(rawStatus, &status)) assert.Equal(c, expState, status.State, "BackgroundManager did not reach expected state in %d seconds. Current status: %s", int(waitTime.Seconds()), string(rawStatus)) - }, waitTime, time.Millisecond*100) + }, waitTime, time.Millisecond*10) + + WaitForBackgroundManagerHeartbeatDocRemoval(t, mgr) return *status } diff --git a/rest/adminapitest/collections_admin_api_test.go b/rest/adminapitest/collections_admin_api_test.go index d5416ad8e5..d2a18f5136 100644 --- a/rest/adminapitest/collections_admin_api_test.go +++ b/rest/adminapitest/collections_admin_api_test.go @@ -281,12 +281,7 @@ func TestRequireResync(t *testing.T) { resp = rt.SendAdminRequest("POST", "/"+db2Name+"/_resync?action=start®enerate_sequences=true", string(resyncPayload)) rest.RequireStatus(t, resp, http.StatusOK) - db2, err := rt.ServerContext().GetDatabase(rt.Context(), db2Name) - require.NoError(t, err) - rest.WaitAndAssertBackgroundManagerState(t, db.BackgroundProcessStateCompleted, - func(t testing.TB) db.BackgroundProcessState { - return db2.ResyncManager.GetRunState() - }) + rt.WaitForResyncDCPStatus(db.BackgroundProcessStateCompleted) resp = rt.SendAdminRequest("GET", "/_all_dbs?verbose=true", "") rest.RequireStatus(t, resp, http.StatusOK) diff --git a/rest/attachmentcompactiontest/attachment_compaction_api_test.go b/rest/attachmentcompactiontest/attachment_compaction_api_test.go index 3da8768f1f..8a007fb579 100644 --- a/rest/attachmentcompactiontest/attachment_compaction_api_test.go +++ b/rest/attachmentcompactiontest/attachment_compaction_api_test.go @@ -13,7 +13,6 @@ import ( "net/http" "strconv" "testing" - "time" "github.com/couchbase/gocbcore/v10" "github.com/couchbase/sync_gateway/base" @@ -59,20 +58,7 @@ func TestAttachmentCompactionAPI(t *testing.T) { resp = rt.SendAdminRequest("POST", "/{{.db}}/_compact?type=attachment", "") rest.RequireStatus(t, resp, http.StatusServiceUnavailable) - // Wait for run to complete - err = rt.WaitForCondition(func() bool { - time.Sleep(1 * time.Second) - - resp := rt.SendAdminRequest("GET", "/{{.db}}/_compact?type=attachment", "") - rest.RequireStatus(t, resp, http.StatusOK) - - var response db.AttachmentManagerResponse - err = base.JSONUnmarshal(resp.BodyBytes(), &response) - require.NoError(t, err) - - return response.State == db.BackgroundProcessStateCompleted - }) - require.NoError(t, err) + rt.WaitForAttachmentCompactionStatus(t, db.BackgroundProcessStateCompleted) dataStore := rt.GetSingleDataStore() collection, ctx := rt.GetSingleTestDatabaseCollectionWithUser() @@ -102,20 +88,7 @@ func TestAttachmentCompactionAPI(t *testing.T) { resp = rt.SendAdminRequest("POST", "/{{.db}}/_compact?type=attachment", "") rest.RequireStatus(t, resp, http.StatusOK) - // Wait for run to complete - err = rt.WaitForCondition(func() bool { - time.Sleep(1 * time.Second) - - resp := rt.SendAdminRequest("GET", "/{{.db}}/_compact?type=attachment", "") - rest.RequireStatus(t, resp, http.StatusOK) - - var response db.AttachmentManagerResponse - err = base.JSONUnmarshal(resp.BodyBytes(), &response) - require.NoError(t, err) - - return response.State == db.BackgroundProcessStateCompleted - }) - require.NoError(t, err) + rt.WaitForAttachmentCompactionStatus(t, db.BackgroundProcessStateCompleted) // Validate results of GET resp = rt.SendAdminRequest("GET", "/{{.db}}/_compact?type=attachment", "") @@ -136,21 +109,6 @@ func TestAttachmentCompactionAPI(t *testing.T) { resp = rt.SendAdminRequest("POST", "/{{.db}}/_compact?type=attachment&action=stop", "") rest.RequireStatus(t, resp, http.StatusOK) - // Verify it has been marked as 'stopping' --> its possible we'll get stopped instead based on timing of persisted doc update - err = rt.WaitForCondition(func() bool { - time.Sleep(1 * time.Second) - - resp := rt.SendAdminRequest("GET", "/{{.db}}/_compact?type=attachment", "") - rest.RequireStatus(t, resp, http.StatusOK) - - var response db.AttachmentManagerResponse - err = base.JSONUnmarshal(resp.BodyBytes(), &response) - require.NoError(t, err) - - return response.State == db.BackgroundProcessStateStopping || response.State == db.BackgroundProcessStateStopped - }) - require.NoError(t, err) - // Wait for run to complete _ = rt.WaitForAttachmentCompactionStatus(t, db.BackgroundProcessStateStopped) } diff --git a/rest/utilities_testing.go b/rest/utilities_testing.go index 9dca79b4dc..16cf0b105d 100644 --- a/rest/utilities_testing.go +++ b/rest/utilities_testing.go @@ -2318,27 +2318,6 @@ func WaitAndAssertConditionTimeout(t *testing.T, timeout time.Duration, fn func( } } -func WaitAndAssertBackgroundManagerState(t testing.TB, expected db.BackgroundProcessState, getStateFunc func(t testing.TB) db.BackgroundProcessState) bool { - t.Helper() - err, actual := base.RetryLoop(base.TestCtx(t), t.Name()+"-WaitAndAssertBackgroundManagerState", func() (shouldRetry bool, err error, value db.BackgroundProcessState) { - actual := getStateFunc(t) - return expected != actual, nil, actual - }, base.CreateMaxDoublingSleeperFunc(30, 100, 1000)) - return assert.NoErrorf(t, err, "expected background manager state %v, but got: %v", expected, actual) -} - -func WaitAndAssertBackgroundManagerExpiredHeartbeat(t testing.TB, bm *db.BackgroundManager) bool { - t.Helper() - err, b := base.RetryLoop(base.TestCtx(t), t.Name()+"-assertNoHeartbeatDoc", func() (shouldRetry bool, err error, value []byte) { - b, err := bm.GetHeartbeatDoc(t) - return !base.IsDocNotFoundError(err), err, b - }, base.CreateMaxDoublingSleeperFunc(30, 100, 1000)) - if b != nil { - return assert.NoErrorf(t, err, "expected heartbeat doc to expire, but found one with contents: %s", b) - } - return assert.Truef(t, base.IsDocNotFoundError(err), "expected heartbeat doc to expire, but got a different error: %v", err) -} - type DocVersion = db.DocVersion // RequireDocVersionNotNil calls t.Fail if two document version is not specified. diff --git a/rest/utilities_testing_attachment.go b/rest/utilities_testing_attachment.go index d4cea3f36f..a2a1553c5c 100644 --- a/rest/utilities_testing_attachment.go +++ b/rest/utilities_testing_attachment.go @@ -11,6 +11,7 @@ package rest import ( "context" "net/http" + "slices" "testing" "time" @@ -97,6 +98,10 @@ func (rt *RestTester) WaitForAttachmentMigrationStatus(t *testing.T, state db.Ba require.NoError(c, err) assert.Equal(c, state, response.State) }, time.Second*20, time.Millisecond*100) + if !slices.Contains([]db.BackgroundProcessState{db.BackgroundProcessStateRunning, db.BackgroundProcessStateStopping}, state) { + db.WaitForBackgroundManagerHeartbeatDocRemoval(t, rt.GetDatabase().AttachmentMigrationManager) + return response + } return response } diff --git a/rest/utilities_testing_resttester.go b/rest/utilities_testing_resttester.go index 7912e48154..79d8775bff 100644 --- a/rest/utilities_testing_resttester.go +++ b/rest/utilities_testing_resttester.go @@ -13,7 +13,6 @@ import ( "fmt" "net/http" "net/url" - "slices" "strings" "sync/atomic" "testing" @@ -410,12 +409,8 @@ func (rt *RestTester) WaitForResyncDCPStatus(status db.BackgroundProcessState) d require.NoError(rt.TB(), json.Unmarshal(response.BodyBytes(), &resyncStatus)) assert.Equal(c, status, resyncStatus.State) - if slices.Contains([]db.BackgroundProcessState{db.BackgroundProcessStateCompleted, db.BackgroundProcessStateStopped}, status) { - var output any - _, err := rt.Bucket().DefaultDataStore().Get(rt.GetDatabase().ResyncManager.GetHeartbeatDocID(rt.TB()), output) - assert.True(c, base.IsDocNotFoundError(err), "expected heartbeat doc to be deleted, got: %v", err) - } }, time.Second*10, time.Millisecond*10) + db.WaitForBackgroundManagerHeartbeatDocRemoval(rt.TB(), rt.GetDatabase().ResyncManager) return resyncStatus } From 4d4c4990e68e8786a4a09363310266733da2c5b0 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Mon, 24 Nov 2025 09:17:42 -0500 Subject: [PATCH 6/7] add wait on status transitions --- db/background_mgr_resync_dcp_test.go | 1 + rest/utilities_testing_attachment.go | 4 ++-- rest/utilities_testing_resttester.go | 5 ++++- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/db/background_mgr_resync_dcp_test.go b/db/background_mgr_resync_dcp_test.go index ba1bd294de..105e4ecd2a 100644 --- a/db/background_mgr_resync_dcp_test.go +++ b/db/background_mgr_resync_dcp_test.go @@ -630,6 +630,7 @@ func waitForResyncState(t testing.TB, db *Database, desiredState BackgroundProce // waitForResyncDocsProcessed waits until the resync manager has processed more than the specified count of documents. func waitForResyncDocsProcessed(t testing.TB, db *Database, count int64) { + // this intentionally uses a very short poll interval to catch progress as quickly as possible require.EventuallyWithT(t, func(c *assert.CollectT) { stats := getResyncStats(t, db) assert.Greater(c, stats.DocsProcessed, count) diff --git a/rest/utilities_testing_attachment.go b/rest/utilities_testing_attachment.go index a2a1553c5c..d95044a51c 100644 --- a/rest/utilities_testing_attachment.go +++ b/rest/utilities_testing_attachment.go @@ -98,10 +98,10 @@ func (rt *RestTester) WaitForAttachmentMigrationStatus(t *testing.T, state db.Ba require.NoError(c, err) assert.Equal(c, state, response.State) }, time.Second*20, time.Millisecond*100) + // Wait for heartbeat doc removal if the state change will result in its removal. Allows calling start + // immediately after db.BackgroundProcessStateStopped without error. if !slices.Contains([]db.BackgroundProcessState{db.BackgroundProcessStateRunning, db.BackgroundProcessStateStopping}, state) { db.WaitForBackgroundManagerHeartbeatDocRemoval(t, rt.GetDatabase().AttachmentMigrationManager) - return response } - return response } diff --git a/rest/utilities_testing_resttester.go b/rest/utilities_testing_resttester.go index 79d8775bff..b2862290d0 100644 --- a/rest/utilities_testing_resttester.go +++ b/rest/utilities_testing_resttester.go @@ -13,6 +13,7 @@ import ( "fmt" "net/http" "net/url" + "slices" "strings" "sync/atomic" "testing" @@ -410,7 +411,9 @@ func (rt *RestTester) WaitForResyncDCPStatus(status db.BackgroundProcessState) d assert.Equal(c, status, resyncStatus.State) }, time.Second*10, time.Millisecond*10) - db.WaitForBackgroundManagerHeartbeatDocRemoval(rt.TB(), rt.GetDatabase().ResyncManager) + if !slices.Contains([]db.BackgroundProcessState{db.BackgroundProcessStateRunning, db.BackgroundProcessStateStopping}, status) { + db.WaitForBackgroundManagerHeartbeatDocRemoval(rt.TB(), rt.GetDatabase().ResyncManager) + } return resyncStatus } From a92022278e79514a15c85326e0311000093d7344 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Mon, 24 Nov 2025 13:50:05 -0500 Subject: [PATCH 7/7] fix some failures --- db/util_testing.go | 4 +++- rest/adminapitest/collections_admin_api_test.go | 5 +++-- rest/attachmentmigrationtest/attachment_migration_test.go | 2 +- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/db/util_testing.go b/db/util_testing.go index cea5e183b3..277c0db9bd 100644 --- a/db/util_testing.go +++ b/db/util_testing.go @@ -968,7 +968,9 @@ func RequireBackgroundManagerState(t testing.TB, mgr *BackgroundManager, expStat assert.Equal(c, expState, status.State, "BackgroundManager did not reach expected state in %d seconds. Current status: %s", int(waitTime.Seconds()), string(rawStatus)) }, waitTime, time.Millisecond*10) - WaitForBackgroundManagerHeartbeatDocRemoval(t, mgr) + if slices.Contains([]BackgroundProcessState{BackgroundProcessStateCompleted, BackgroundProcessStateStopped, BackgroundProcessStateError}, expState) { + WaitForBackgroundManagerHeartbeatDocRemoval(t, mgr) + } return *status } diff --git a/rest/adminapitest/collections_admin_api_test.go b/rest/adminapitest/collections_admin_api_test.go index d2a18f5136..d54ab81320 100644 --- a/rest/adminapitest/collections_admin_api_test.go +++ b/rest/adminapitest/collections_admin_api_test.go @@ -280,8 +280,9 @@ func TestRequireResync(t *testing.T) { resp = rt.SendAdminRequest("POST", "/"+db2Name+"/_resync?action=start®enerate_sequences=true", string(resyncPayload)) rest.RequireStatus(t, resp, http.StatusOK) - - rt.WaitForResyncDCPStatus(db.BackgroundProcessStateCompleted) + db2, err := rt.ServerContext().GetDatabase(rt.Context(), db2Name) + require.NoError(t, err) + db.RequireBackgroundManagerState(t, db2.ResyncManager, db.BackgroundProcessStateCompleted) resp = rt.SendAdminRequest("GET", "/_all_dbs?verbose=true", "") rest.RequireStatus(t, resp, http.StatusOK) diff --git a/rest/attachmentmigrationtest/attachment_migration_test.go b/rest/attachmentmigrationtest/attachment_migration_test.go index 54f1041477..1ee667de0c 100644 --- a/rest/attachmentmigrationtest/attachment_migration_test.go +++ b/rest/attachmentmigrationtest/attachment_migration_test.go @@ -121,7 +121,7 @@ func TestChangeDbCollectionsRestartMigrationJob(t *testing.T) { dbCtx = rt.GetDatabase() scNames = append(scNames, base.ScopeAndCollectionName{Scope: scope, Collection: collection2}) - assert.ElementsMatch(t, scNames, dbCtx.RequireAttachmentMigration) + require.ElementsMatch(t, scNames, dbCtx.RequireAttachmentMigration) mgrStatus := waitForAttachmentMigrationState(rt, db.BackgroundProcessStateCompleted) // assert that number of docs precessed is greater than the total docs added, this will be because when updating