From 19164217577ef9aadee429b89925a7bab241da39 Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Thu, 25 Sep 2025 12:39:55 +0100 Subject: [PATCH 1/4] Use `versionPruningWindowHrs` for HLV Compaction instead of metadata purge interval --- base/bucket.go | 1 + base/bucket_gocb_test.go | 19 +++++++++++++++++++ base/collection.go | 27 +++++++++++++++++++++++++-- db/crud.go | 4 ++-- db/database.go | 33 +++++++++++++++++++++++++++++++++ 5 files changed, 80 insertions(+), 4 deletions(-) diff --git a/base/bucket.go b/base/bucket.go index 95dab7d57a..2a0f22e69b 100644 --- a/base/bucket.go +++ b/base/bucket.go @@ -55,6 +55,7 @@ type WrappingDatastore interface { type CouchbaseBucketStore interface { GetName() string MgmtEps() ([]string, error) + VersionPruningWindow(ctx context.Context) (time.Duration, error) MetadataPurgeInterval(ctx context.Context) (time.Duration, error) MaxTTL(context.Context) (int, error) HttpClient(context.Context) *http.Client diff --git a/base/bucket_gocb_test.go b/base/bucket_gocb_test.go index 9dc9afbb5e..c92fa728fe 100644 --- a/base/bucket_gocb_test.go +++ b/base/bucket_gocb_test.go @@ -2632,3 +2632,22 @@ func TestWriteUpdateWithXattrsDocumentTombstone(t *testing.T) { require.JSONEq(t, string(xattrModifiedBody), string(xattrs[xattr1Key])) require.NotContains(t, xattrs, xattr2Key) } + +func TestVersionPruningWindow(t *testing.T) { + if UnitTestUrlIsWalrus() { + t.Skip("This test only works against Couchbase Server") + } + + ctx := TestCtx(t) + bucket := GetTestBucket(t) + defer bucket.Close(ctx) + + cbStore, ok := AsCouchbaseBucketStore(bucket) + require.True(t, ok) + + vpw, err := cbStore.VersionPruningWindow(ctx) + require.NoError(t, err) + + // it's assumed that anywhere this test is running has a default pruning window (30 days) - but this at least ensures we can retrieve it + assert.Equal(t, time.Hour*24*30, vpw) +} diff --git a/base/collection.go b/base/collection.go index fa68fd80af..7e09cea53f 100644 --- a/base/collection.go +++ b/base/collection.go @@ -468,12 +468,35 @@ func (b *GocbV2Bucket) QueryEpsCount() (int, error) { return len(agent.N1qlEps()), nil } -// Gets the metadata purge interval for the bucket. First checks for a bucket-specific value. If not -// found, retrieves the cluster-wide value. +// MetadataPurgeInterval gets the metadata purge interval for the bucket. Checks for a bucket-specific value before the cluster value. func (b *GocbV2Bucket) MetadataPurgeInterval(ctx context.Context) (time.Duration, error) { return getMetadataPurgeInterval(ctx, b) } +// VersionPruningWindow gets the version pruning window for the bucket. +func (b *GocbV2Bucket) VersionPruningWindow(ctx context.Context) (time.Duration, error) { + uri := fmt.Sprintf("/pools/default/buckets/%s", b.GetName()) + respBytes, statusCode, err := b.MgmtRequest(ctx, http.MethodGet, uri, "application/json", nil) + if err != nil { + return 0, err + } + + if statusCode == http.StatusForbidden { + WarnfCtx(ctx, "403 Forbidden attempting to access %s. Bucket user must have Bucket Full Access and Bucket Admin roles to retrieve version pruning window.", UD(uri)) + } else if statusCode != http.StatusOK { + return 0, errors.New(fmt.Sprintf("failed with status code, %d, statusCode", statusCode)) + } + + var response struct { + VersionPruningWindowHrs int64 `json:"versionPruningWindowHrs,omitempty"` + } + if err := JSONUnmarshal(respBytes, &response); err != nil { + return 0, err + } + + return time.Duration(response.VersionPruningWindowHrs) * time.Hour, nil +} + func (b *GocbV2Bucket) MaxTTL(ctx context.Context) (int, error) { return getMaxTTL(ctx, b) } diff --git a/db/crud.go b/db/crud.go index c8556ec8fa..79c0bb0e28 100644 --- a/db/crud.go +++ b/db/crud.go @@ -1033,8 +1033,8 @@ func (db *DatabaseCollectionWithUser) updateHLV(ctx context.Context, d *Document } // clean up PV only if we have more than a handful of source IDs - reduce Compaction and false-conflict risk where we don't need it if len(d.HLV.PreviousVersions) > minPVEntriesBeforeCompaction { - mpi := db.dbCtx.GetMetadataPurgeInterval(ctx, false) - d.HLV.Compact(ctx, d.ID, mpi) + vpw := db.dbCtx.GetVersionPruningWindow(ctx, false) + d.HLV.Compact(ctx, d.ID, vpw) } d.SyncData.SetCV(d.HLV) return d, nil diff --git a/db/database.go b/db/database.go index aa62dcfd39..c9b3a87d3c 100644 --- a/db/database.go +++ b/db/database.go @@ -69,6 +69,7 @@ const ( // used as default metadata purge interval when the server’s purge // interval (either bucket specific or cluster wide) is not available. DefaultPurgeInterval = 30 * 24 * time.Hour + DefaultVersionPruningWindow = DefaultPurgeInterval DefaultSGReplicateEnabled = true DefaultSGReplicateWebsocketPingInterval = time.Minute * 5 DefaultCompactInterval = 24 * time.Hour @@ -159,6 +160,7 @@ type DatabaseContext struct { BroadcastSlowMode atomic.Bool // bool to indicate if a slower ticker value should be used to notify changes feeds of changes DatabaseStartupError *DatabaseError // Error that occurred during database online processes startup CachedPurgeInterval atomic.Pointer[time.Duration] // If set, the cached value of the purge interval to avoid repeated lookups + CachedVersionPruningWindow atomic.Pointer[time.Duration] // If set, the cached value of the version pruning window to avoid repeated lookups } type Scope struct { @@ -1626,6 +1628,37 @@ func (db *DatabaseContext) GetMetadataPurgeInterval(ctx context.Context, forceRe return mpi } +// GetVersionPruningWindow returns the current value for the XDCR Version Pruning Window for the backing bucket. +// if forceRefresh is set, we'll always fetch a new value from the bucket, even if we had one cached. +func (db *DatabaseContext) GetVersionPruningWindow(ctx context.Context, forceRefresh bool) time.Duration { + // fetch cached value if available + if !forceRefresh { + vpw := db.CachedVersionPruningWindow.Load() + if vpw != nil { + return *vpw + } + } + + // fetch from server + cbStore, ok := base.AsCouchbaseBucketStore(db.Bucket) + if !ok { + return DefaultVersionPruningWindow + } + + serverVersionPruningWindow, err := cbStore.VersionPruningWindow(ctx) + if err != nil { + base.WarnfCtx(ctx, "Unable to retrieve server's version pruning window - using default %.2f days. %s", DefaultVersionPruningWindow.Hours()/24, err) + } + + vpw := DefaultVersionPruningWindow + if serverVersionPruningWindow > 0 { + vpw = serverVersionPruningWindow + } + + db.CachedVersionPruningWindow.Store(&vpw) + return vpw +} + func (c *DatabaseCollection) updateAllPrincipalsSequences(ctx context.Context) error { users, roles, err := c.allPrincipalIDs(ctx) if err != nil { From 70b73561c3f7574ee3d7675cde5888007968be03 Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Thu, 25 Sep 2025 12:51:04 +0100 Subject: [PATCH 2/4] add test override for version pruning window --- db/database.go | 94 ++++++++++--------- .../replicator_conflict_test.go | 6 +- 2 files changed, 53 insertions(+), 47 deletions(-) diff --git a/db/database.go b/db/database.go index c9b3a87d3c..637d743bf8 100644 --- a/db/database.go +++ b/db/database.go @@ -168,50 +168,51 @@ type Scope struct { } type DatabaseContextOptions struct { - CacheOptions *CacheOptions - RevisionCacheOptions *RevisionCacheOptions - OldRevExpirySeconds uint32 - AdminInterface *string - UnsupportedOptions *UnsupportedOptions - OIDCOptions *auth.OIDCOptions - LocalJWTConfig auth.LocalJWTConfig - ImportOptions ImportOptions - EnableXattr bool // Use xattr for _sync - LocalDocExpirySecs uint32 // The _local doc expiry time in seconds - SecureCookieOverride bool // Pass-through DBConfig.SecureCookieOverride - SessionCookieName string // Pass-through DbConfig.SessionCookieName - SessionCookieHttpOnly bool // Pass-through DbConfig.SessionCookieHTTPOnly - UserFunctions *UserFunctions // JS/N1QL functions clients can call - AllowConflicts *bool // False forbids creating conflicts - SendWWWAuthenticateHeader *bool // False disables setting of 'WWW-Authenticate' header - DisablePasswordAuthentication bool // True enforces OIDC/guest only - UseViews bool // Force use of views - DeltaSyncOptions DeltaSyncOptions // Delta Sync Options - CompactInterval uint32 // Interval in seconds between compaction is automatically ran - 0 means don't run - SGReplicateOptions SGReplicateOptions - SlowQueryWarningThreshold time.Duration - QueryPaginationLimit int // Limit used for pagination of queries. If not set defaults to DefaultQueryPaginationLimit - UserXattrKey string // Key of user xattr that will be accessible from the Sync Function. If empty the feature will be disabled. - ClientPartitionWindow time.Duration - BcryptCost int - GroupID string - JavascriptTimeout time.Duration // Max time the JS functions run for (ie. sync fn, import filter) - UseLegacySyncDocsIndex bool - Scopes ScopesOptions - MetadataStore base.DataStore // If set, use this location/connection for SG metadata storage - if not set, metadata is stored using the same location/connection as the bucket used for data storage. - MetadataID string // MetadataID used for metadata storage - BlipStatsReportingInterval int64 // interval to report blip stats in milliseconds - ChangesRequestPlus bool // Sets the default value for request_plus, for non-continuous changes feeds - ConfigPrincipals *ConfigPrincipals - TestPurgeIntervalOverride *time.Duration // If set, use this value for db.GetMetadataPurgeInterval - test seam to force specific purge interval for tests - LoggingConfig *base.DbLogConfig // Per-database log configuration - MaxConcurrentChangesBatches *int // Maximum number of changes batches to process concurrently per replication - MaxConcurrentRevs *int // Maximum number of revs to process concurrently per replication - NumIndexReplicas uint // Number of replicas for GSI indexes - NumIndexPartitions *uint32 // Number of partitions for GSI indexes, if not set will default to 1 - ImportVersion uint64 // Version included in import DCP checkpoints, incremented when collections added to db - DisablePublicAllDocs bool // Disable public access to the _all_docs endpoint for this database - StoreLegacyRevTreeData *bool // Whether to store additional data for legacy rev tree support in delta sync and replication backup revs + CacheOptions *CacheOptions + RevisionCacheOptions *RevisionCacheOptions + OldRevExpirySeconds uint32 + AdminInterface *string + UnsupportedOptions *UnsupportedOptions + OIDCOptions *auth.OIDCOptions + LocalJWTConfig auth.LocalJWTConfig + ImportOptions ImportOptions + EnableXattr bool // Use xattr for _sync + LocalDocExpirySecs uint32 // The _local doc expiry time in seconds + SecureCookieOverride bool // Pass-through DBConfig.SecureCookieOverride + SessionCookieName string // Pass-through DbConfig.SessionCookieName + SessionCookieHttpOnly bool // Pass-through DbConfig.SessionCookieHTTPOnly + UserFunctions *UserFunctions // JS/N1QL functions clients can call + AllowConflicts *bool // False forbids creating conflicts + SendWWWAuthenticateHeader *bool // False disables setting of 'WWW-Authenticate' header + DisablePasswordAuthentication bool // True enforces OIDC/guest only + UseViews bool // Force use of views + DeltaSyncOptions DeltaSyncOptions // Delta Sync Options + CompactInterval uint32 // Interval in seconds between compaction is automatically ran - 0 means don't run + SGReplicateOptions SGReplicateOptions + SlowQueryWarningThreshold time.Duration + QueryPaginationLimit int // Limit used for pagination of queries. If not set defaults to DefaultQueryPaginationLimit + UserXattrKey string // Key of user xattr that will be accessible from the Sync Function. If empty the feature will be disabled. + ClientPartitionWindow time.Duration + BcryptCost int + GroupID string + JavascriptTimeout time.Duration // Max time the JS functions run for (ie. sync fn, import filter) + UseLegacySyncDocsIndex bool + Scopes ScopesOptions + MetadataStore base.DataStore // If set, use this location/connection for SG metadata storage - if not set, metadata is stored using the same location/connection as the bucket used for data storage. + MetadataID string // MetadataID used for metadata storage + BlipStatsReportingInterval int64 // interval to report blip stats in milliseconds + ChangesRequestPlus bool // Sets the default value for request_plus, for non-continuous changes feeds + ConfigPrincipals *ConfigPrincipals + TestPurgeIntervalOverride *time.Duration // If set, use this value for db.GetMetadataPurgeInterval - test seam to force specific purge interval for tests + TestVersionPruningWindowOverride *time.Duration // If set, use this value for db.GetVersionPruningWindow - test seam to force specific pruning window for tests + LoggingConfig *base.DbLogConfig // Per-database log configuration + MaxConcurrentChangesBatches *int // Maximum number of changes batches to process concurrently per replication + MaxConcurrentRevs *int // Maximum number of revs to process concurrently per replication + NumIndexReplicas uint // Number of replicas for GSI indexes + NumIndexPartitions *uint32 // Number of partitions for GSI indexes, if not set will default to 1 + ImportVersion uint64 // Version included in import DCP checkpoints, incremented when collections added to db + DisablePublicAllDocs bool // Disable public access to the _all_docs endpoint for this database + StoreLegacyRevTreeData *bool // Whether to store additional data for legacy rev tree support in delta sync and replication backup revs } type ConfigPrincipals struct { @@ -1631,6 +1632,11 @@ func (db *DatabaseContext) GetMetadataPurgeInterval(ctx context.Context, forceRe // GetVersionPruningWindow returns the current value for the XDCR Version Pruning Window for the backing bucket. // if forceRefresh is set, we'll always fetch a new value from the bucket, even if we had one cached. func (db *DatabaseContext) GetVersionPruningWindow(ctx context.Context, forceRefresh bool) time.Duration { + // test override + if db.Options.TestVersionPruningWindowOverride != nil { + return *db.Options.TestVersionPruningWindowOverride + } + // fetch cached value if available if !forceRefresh { vpw := db.CachedVersionPruningWindow.Load() diff --git a/rest/replicatortest/replicator_conflict_test.go b/rest/replicatortest/replicator_conflict_test.go index c5d1e9bded..2a2a7c3112 100644 --- a/rest/replicatortest/replicator_conflict_test.go +++ b/rest/replicatortest/replicator_conflict_test.go @@ -1039,9 +1039,9 @@ func TestActiveReplicatorHLVConflictNoCommonMVPV(t *testing.T) { ctx1 := rt1.Context() ctx2 := rt2.Context() - // disable purge interval so we can avoid HLV compaction for the artificially low HLV values in this test - rt1.GetDatabase().Options.TestPurgeIntervalOverride = base.Ptr(time.Duration(0)) - rt2.GetDatabase().Options.TestPurgeIntervalOverride = base.Ptr(time.Duration(0)) + // disable pruning window so we can avoid HLV compaction for the artificially low HLV values in this test + rt1.GetDatabase().Options.TestVersionPruningWindowOverride = base.Ptr(time.Duration(0)) + rt2.GetDatabase().Options.TestVersionPruningWindowOverride = base.Ptr(time.Duration(0)) docID := "doc1_" version := rt2.PutDoc(docID, `{"source":"rt2","channels":["alice"]}`) From 303ead420ec71b1c036e1cbac8ffe966558cbac0 Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Thu, 25 Sep 2025 13:04:08 +0100 Subject: [PATCH 3/4] Update base/collection.go --- base/collection.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/base/collection.go b/base/collection.go index 7e09cea53f..6771945d1a 100644 --- a/base/collection.go +++ b/base/collection.go @@ -484,7 +484,7 @@ func (b *GocbV2Bucket) VersionPruningWindow(ctx context.Context) (time.Duration, if statusCode == http.StatusForbidden { WarnfCtx(ctx, "403 Forbidden attempting to access %s. Bucket user must have Bucket Full Access and Bucket Admin roles to retrieve version pruning window.", UD(uri)) } else if statusCode != http.StatusOK { - return 0, errors.New(fmt.Sprintf("failed with status code, %d, statusCode", statusCode)) + return 0, fmt.Errorf("failed with status code %d", statusCode) } var response struct { From 0133f77cce5b8de160ae98c38e18c9d80acf4cb8 Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Fri, 26 Sep 2025 14:20:37 +0100 Subject: [PATCH 4/4] Make GocbV2Bucket.VersionPruningWindow return error for 403s --- base/collection.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/base/collection.go b/base/collection.go index 6771945d1a..14ff151d24 100644 --- a/base/collection.go +++ b/base/collection.go @@ -482,7 +482,7 @@ func (b *GocbV2Bucket) VersionPruningWindow(ctx context.Context) (time.Duration, } if statusCode == http.StatusForbidden { - WarnfCtx(ctx, "403 Forbidden attempting to access %s. Bucket user must have Bucket Full Access and Bucket Admin roles to retrieve version pruning window.", UD(uri)) + return 0, RedactErrorf("403 Forbidden attempting to access %s. Bucket user must have Bucket Full Access and Bucket Admin roles to retrieve version pruning window.", UD(uri)) } else if statusCode != http.StatusOK { return 0, fmt.Errorf("failed with status code %d", statusCode) }