Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions base/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type WrappingDatastore interface {
type CouchbaseBucketStore interface {
GetName() string
MgmtEps() ([]string, error)
VersionPruningWindow(ctx context.Context) (time.Duration, error)
MetadataPurgeInterval(ctx context.Context) (time.Duration, error)
MaxTTL(context.Context) (int, error)
HttpClient(context.Context) *http.Client
Expand Down
19 changes: 19 additions & 0 deletions base/bucket_gocb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2632,3 +2632,22 @@ func TestWriteUpdateWithXattrsDocumentTombstone(t *testing.T) {
require.JSONEq(t, string(xattrModifiedBody), string(xattrs[xattr1Key]))
require.NotContains(t, xattrs, xattr2Key)
}

func TestVersionPruningWindow(t *testing.T) {
if UnitTestUrlIsWalrus() {
t.Skip("This test only works against Couchbase Server")
}

ctx := TestCtx(t)
bucket := GetTestBucket(t)
defer bucket.Close(ctx)

cbStore, ok := AsCouchbaseBucketStore(bucket)
require.True(t, ok)

vpw, err := cbStore.VersionPruningWindow(ctx)
require.NoError(t, err)

// it's assumed that anywhere this test is running has a default pruning window (30 days) - but this at least ensures we can retrieve it
assert.Equal(t, time.Hour*24*30, vpw)
}
27 changes: 25 additions & 2 deletions base/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,12 +468,35 @@ func (b *GocbV2Bucket) QueryEpsCount() (int, error) {
return len(agent.N1qlEps()), nil
}

// Gets the metadata purge interval for the bucket. First checks for a bucket-specific value. If not
// found, retrieves the cluster-wide value.
// MetadataPurgeInterval gets the metadata purge interval for the bucket. Checks for a bucket-specific value before the cluster value.
func (b *GocbV2Bucket) MetadataPurgeInterval(ctx context.Context) (time.Duration, error) {
return getMetadataPurgeInterval(ctx, b)
}

// VersionPruningWindow gets the version pruning window for the bucket.
func (b *GocbV2Bucket) VersionPruningWindow(ctx context.Context) (time.Duration, error) {
uri := fmt.Sprintf("/pools/default/buckets/%s", b.GetName())
respBytes, statusCode, err := b.MgmtRequest(ctx, http.MethodGet, uri, "application/json", nil)
if err != nil {
return 0, err
}

if statusCode == http.StatusForbidden {
return 0, RedactErrorf("403 Forbidden attempting to access %s. Bucket user must have Bucket Full Access and Bucket Admin roles to retrieve version pruning window.", UD(uri))
} else if statusCode != http.StatusOK {
return 0, fmt.Errorf("failed with status code %d", statusCode)
}

var response struct {
VersionPruningWindowHrs int64 `json:"versionPruningWindowHrs,omitempty"`
}
if err := JSONUnmarshal(respBytes, &response); err != nil {
return 0, err
}

return time.Duration(response.VersionPruningWindowHrs) * time.Hour, nil
}

func (b *GocbV2Bucket) MaxTTL(ctx context.Context) (int, error) {
return getMaxTTL(ctx, b)
}
Expand Down
4 changes: 2 additions & 2 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -1033,8 +1033,8 @@ func (db *DatabaseCollectionWithUser) updateHLV(ctx context.Context, d *Document
}
// clean up PV only if we have more than a handful of source IDs - reduce Compaction and false-conflict risk where we don't need it
if len(d.HLV.PreviousVersions) > minPVEntriesBeforeCompaction {
mpi := db.dbCtx.GetMetadataPurgeInterval(ctx, false)
d.HLV.Compact(ctx, d.ID, mpi)
vpw := db.dbCtx.GetVersionPruningWindow(ctx, false)
d.HLV.Compact(ctx, d.ID, vpw)
}
d.SyncData.SetCV(d.HLV)
return d, nil
Expand Down
127 changes: 83 additions & 44 deletions db/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ const (
// used as default metadata purge interval when the server’s purge
// interval (either bucket specific or cluster wide) is not available.
DefaultPurgeInterval = 30 * 24 * time.Hour
DefaultVersionPruningWindow = DefaultPurgeInterval
DefaultSGReplicateEnabled = true
DefaultSGReplicateWebsocketPingInterval = time.Minute * 5
DefaultCompactInterval = 24 * time.Hour
Expand Down Expand Up @@ -159,57 +160,59 @@ type DatabaseContext struct {
BroadcastSlowMode atomic.Bool // bool to indicate if a slower ticker value should be used to notify changes feeds of changes
DatabaseStartupError *DatabaseError // Error that occurred during database online processes startup
CachedPurgeInterval atomic.Pointer[time.Duration] // If set, the cached value of the purge interval to avoid repeated lookups
CachedVersionPruningWindow atomic.Pointer[time.Duration] // If set, the cached value of the version pruning window to avoid repeated lookups
}

type Scope struct {
Collections map[string]*DatabaseCollection
}

type DatabaseContextOptions struct {
CacheOptions *CacheOptions
RevisionCacheOptions *RevisionCacheOptions
OldRevExpirySeconds uint32
AdminInterface *string
UnsupportedOptions *UnsupportedOptions
OIDCOptions *auth.OIDCOptions
LocalJWTConfig auth.LocalJWTConfig
ImportOptions ImportOptions
EnableXattr bool // Use xattr for _sync
LocalDocExpirySecs uint32 // The _local doc expiry time in seconds
SecureCookieOverride bool // Pass-through DBConfig.SecureCookieOverride
SessionCookieName string // Pass-through DbConfig.SessionCookieName
SessionCookieHttpOnly bool // Pass-through DbConfig.SessionCookieHTTPOnly
UserFunctions *UserFunctions // JS/N1QL functions clients can call
AllowConflicts *bool // False forbids creating conflicts
SendWWWAuthenticateHeader *bool // False disables setting of 'WWW-Authenticate' header
DisablePasswordAuthentication bool // True enforces OIDC/guest only
UseViews bool // Force use of views
DeltaSyncOptions DeltaSyncOptions // Delta Sync Options
CompactInterval uint32 // Interval in seconds between compaction is automatically ran - 0 means don't run
SGReplicateOptions SGReplicateOptions
SlowQueryWarningThreshold time.Duration
QueryPaginationLimit int // Limit used for pagination of queries. If not set defaults to DefaultQueryPaginationLimit
UserXattrKey string // Key of user xattr that will be accessible from the Sync Function. If empty the feature will be disabled.
ClientPartitionWindow time.Duration
BcryptCost int
GroupID string
JavascriptTimeout time.Duration // Max time the JS functions run for (ie. sync fn, import filter)
UseLegacySyncDocsIndex bool
Scopes ScopesOptions
MetadataStore base.DataStore // If set, use this location/connection for SG metadata storage - if not set, metadata is stored using the same location/connection as the bucket used for data storage.
MetadataID string // MetadataID used for metadata storage
BlipStatsReportingInterval int64 // interval to report blip stats in milliseconds
ChangesRequestPlus bool // Sets the default value for request_plus, for non-continuous changes feeds
ConfigPrincipals *ConfigPrincipals
TestPurgeIntervalOverride *time.Duration // If set, use this value for db.GetMetadataPurgeInterval - test seam to force specific purge interval for tests
LoggingConfig *base.DbLogConfig // Per-database log configuration
MaxConcurrentChangesBatches *int // Maximum number of changes batches to process concurrently per replication
MaxConcurrentRevs *int // Maximum number of revs to process concurrently per replication
NumIndexReplicas uint // Number of replicas for GSI indexes
NumIndexPartitions *uint32 // Number of partitions for GSI indexes, if not set will default to 1
ImportVersion uint64 // Version included in import DCP checkpoints, incremented when collections added to db
DisablePublicAllDocs bool // Disable public access to the _all_docs endpoint for this database
StoreLegacyRevTreeData *bool // Whether to store additional data for legacy rev tree support in delta sync and replication backup revs
CacheOptions *CacheOptions
RevisionCacheOptions *RevisionCacheOptions
OldRevExpirySeconds uint32
AdminInterface *string
UnsupportedOptions *UnsupportedOptions
OIDCOptions *auth.OIDCOptions
LocalJWTConfig auth.LocalJWTConfig
ImportOptions ImportOptions
EnableXattr bool // Use xattr for _sync
LocalDocExpirySecs uint32 // The _local doc expiry time in seconds
SecureCookieOverride bool // Pass-through DBConfig.SecureCookieOverride
SessionCookieName string // Pass-through DbConfig.SessionCookieName
SessionCookieHttpOnly bool // Pass-through DbConfig.SessionCookieHTTPOnly
UserFunctions *UserFunctions // JS/N1QL functions clients can call
AllowConflicts *bool // False forbids creating conflicts
SendWWWAuthenticateHeader *bool // False disables setting of 'WWW-Authenticate' header
DisablePasswordAuthentication bool // True enforces OIDC/guest only
UseViews bool // Force use of views
DeltaSyncOptions DeltaSyncOptions // Delta Sync Options
CompactInterval uint32 // Interval in seconds between compaction is automatically ran - 0 means don't run
SGReplicateOptions SGReplicateOptions
SlowQueryWarningThreshold time.Duration
QueryPaginationLimit int // Limit used for pagination of queries. If not set defaults to DefaultQueryPaginationLimit
UserXattrKey string // Key of user xattr that will be accessible from the Sync Function. If empty the feature will be disabled.
ClientPartitionWindow time.Duration
BcryptCost int
GroupID string
JavascriptTimeout time.Duration // Max time the JS functions run for (ie. sync fn, import filter)
UseLegacySyncDocsIndex bool
Scopes ScopesOptions
MetadataStore base.DataStore // If set, use this location/connection for SG metadata storage - if not set, metadata is stored using the same location/connection as the bucket used for data storage.
MetadataID string // MetadataID used for metadata storage
BlipStatsReportingInterval int64 // interval to report blip stats in milliseconds
ChangesRequestPlus bool // Sets the default value for request_plus, for non-continuous changes feeds
ConfigPrincipals *ConfigPrincipals
TestPurgeIntervalOverride *time.Duration // If set, use this value for db.GetMetadataPurgeInterval - test seam to force specific purge interval for tests
TestVersionPruningWindowOverride *time.Duration // If set, use this value for db.GetVersionPruningWindow - test seam to force specific pruning window for tests
LoggingConfig *base.DbLogConfig // Per-database log configuration
MaxConcurrentChangesBatches *int // Maximum number of changes batches to process concurrently per replication
MaxConcurrentRevs *int // Maximum number of revs to process concurrently per replication
NumIndexReplicas uint // Number of replicas for GSI indexes
NumIndexPartitions *uint32 // Number of partitions for GSI indexes, if not set will default to 1
ImportVersion uint64 // Version included in import DCP checkpoints, incremented when collections added to db
DisablePublicAllDocs bool // Disable public access to the _all_docs endpoint for this database
StoreLegacyRevTreeData *bool // Whether to store additional data for legacy rev tree support in delta sync and replication backup revs
}

type ConfigPrincipals struct {
Expand Down Expand Up @@ -1626,6 +1629,42 @@ func (db *DatabaseContext) GetMetadataPurgeInterval(ctx context.Context, forceRe
return mpi
}

// GetVersionPruningWindow returns the current value for the XDCR Version Pruning Window for the backing bucket.
// if forceRefresh is set, we'll always fetch a new value from the bucket, even if we had one cached.
func (db *DatabaseContext) GetVersionPruningWindow(ctx context.Context, forceRefresh bool) time.Duration {
// test override
if db.Options.TestVersionPruningWindowOverride != nil {
return *db.Options.TestVersionPruningWindowOverride
}

// fetch cached value if available
if !forceRefresh {
vpw := db.CachedVersionPruningWindow.Load()
if vpw != nil {
return *vpw
}
}

// fetch from server
cbStore, ok := base.AsCouchbaseBucketStore(db.Bucket)
if !ok {
return DefaultVersionPruningWindow
}

serverVersionPruningWindow, err := cbStore.VersionPruningWindow(ctx)
if err != nil {
base.WarnfCtx(ctx, "Unable to retrieve server's version pruning window - using default %.2f days. %s", DefaultVersionPruningWindow.Hours()/24, err)
}

vpw := DefaultVersionPruningWindow
if serverVersionPruningWindow > 0 {
vpw = serverVersionPruningWindow
}

db.CachedVersionPruningWindow.Store(&vpw)
return vpw
}

func (c *DatabaseCollection) updateAllPrincipalsSequences(ctx context.Context) error {
users, roles, err := c.allPrincipalIDs(ctx)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions rest/replicatortest/replicator_conflict_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1039,9 +1039,9 @@ func TestActiveReplicatorHLVConflictNoCommonMVPV(t *testing.T) {
ctx1 := rt1.Context()
ctx2 := rt2.Context()

// disable purge interval so we can avoid HLV compaction for the artificially low HLV values in this test
rt1.GetDatabase().Options.TestPurgeIntervalOverride = base.Ptr(time.Duration(0))
rt2.GetDatabase().Options.TestPurgeIntervalOverride = base.Ptr(time.Duration(0))
// disable pruning window so we can avoid HLV compaction for the artificially low HLV values in this test
rt1.GetDatabase().Options.TestVersionPruningWindowOverride = base.Ptr(time.Duration(0))
rt2.GetDatabase().Options.TestVersionPruningWindowOverride = base.Ptr(time.Duration(0))

docID := "doc1_"
version := rt2.PutDoc(docID, `{"source":"rt2","channels":["alice"]}`)
Expand Down
Loading