Skip to content

Commit e83d779

Browse files
committed
Create an abstract DCP client, support rosmar DCP
1 parent d61ca43 commit e83d779

26 files changed

+609
-612
lines changed

base/abstract_dcp_client.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
// Copyright 2025-Present Couchbase, Inc.
2+
//
3+
// Use of this software is governed by the Business Source License included
4+
// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
5+
// in that file, in accordance with the Business Source License, use of this
6+
// software will be governed by the Apache License, Version 2.0, included in
7+
// the file licenses/APL2.txt.
8+
9+
package base
10+
11+
import (
12+
"context"
13+
"expvar"
14+
"fmt"
15+
16+
sgbucket "github.com/couchbase/sg-bucket"
17+
"github.com/couchbaselabs/rosmar"
18+
)
19+
20+
// DCPClient is an interface for all DCP implementations.
21+
type DCPClient interface {
22+
// Start will start the DCP feed. It returns a channel marking the end of the feed.
23+
Start(ctx context.Context) (chan error, error)
24+
// Close will shut down the DCP feed.
25+
Close()
26+
// GetMetadata returns the current DCP metadata.
27+
GetMetadata() []DCPMetadata
28+
// GetMetadataKeyPrefix returns the key prefix used for storing any persistent data.
29+
GetMetadataKeyPrefix() string
30+
}
31+
32+
// DCPClientOptions are options for creating a DCPClient.
33+
type DCPClientOptions struct {
34+
FeedPrefix string // name of the DCP feed, used for logging locally and stored by Couchbase Server
35+
Callback sgbucket.FeedEventCallbackFunc // callback function for DCP events
36+
DBStats *expvar.Map // these options are used only for gocbcore implementation, these stats are not shared by prometheus stats
37+
CheckpointPrefix string // start of the checkpoint documents
38+
CollectionNames CollectionNames // scopes and collections to monitor
39+
InitialMetadata []DCPMetadata // initial metadata to seed the DCP client with
40+
MetadataStoreType DCPMetadataStoreType // persistent or in memory storage
41+
OneShot bool // if true, the feed runs to latest document found when the client is started
42+
FailOnRollback bool // if true, fail Start if the current DCP checkpoints encounter a rollback condition
43+
Terminator chan bool // optional channel that can be closed to terminate the DCP feed, this will be replaced with a context option.
44+
FromLatestSequence bool // If true, start at latest sequence.
45+
}
46+
47+
// NewDCPClient creates a new DCPClient to receive events from a bucket.
48+
func NewDCPClient(ctx context.Context, bucket Bucket, opts DCPClientOptions) (DCPClient, error) {
49+
if opts.FeedPrefix == "" {
50+
return nil, fmt.Errorf("DCPClientOptions.IDPrefix must be provided")
51+
} else if bucket == nil {
52+
return nil, fmt.Errorf("bucket must be provided")
53+
} else if opts.Callback == nil {
54+
return nil, fmt.Errorf("DCPClientOptions.Callback must be provided")
55+
} else if len(opts.CollectionNames) == 0 {
56+
return nil, fmt.Errorf("DCPClientOptions.CollectionNames must be provided")
57+
} else if opts.FromLatestSequence && len(opts.InitialMetadata) > 0 {
58+
return nil, fmt.Errorf("DCPClientOptions.InitialMetadata cannot be provided when FromLatestSequence is true")
59+
} else if opts.MetadataStoreType == DCPMetadataStoreInMemory && opts.CheckpointPrefix != "" {
60+
return nil, fmt.Errorf("DCPClientOptions.CheckpointPrefix cannot be provided when MetadataStoreType is InMemory")
61+
}
62+
underlyingBucket := GetBaseBucket(bucket)
63+
if _, ok := underlyingBucket.(*rosmar.Bucket); ok {
64+
return NewRosmarDCPClient(bucket, opts)
65+
} else if gocbBucket, ok := underlyingBucket.(*GocbV2Bucket); ok {
66+
return newGocbDCPClient(ctx, gocbBucket, opts)
67+
}
68+
return nil, fmt.Errorf("bucket type %T does not have a DCPClient implementation", underlyingBucket)
69+
}
70+
71+
// StartDCPFeed creates and starts a DCP feed. This function will return as soon as the feed is started. doneChan is
72+
// sent a single error value when the feed terminates.
73+
func StartDCPFeed(ctx context.Context, bucket Bucket, opts DCPClientOptions) (doneChan <-chan error, err error) {
74+
client, err := NewDCPClient(ctx, bucket, opts)
75+
if err != nil {
76+
return nil, err
77+
}
78+
bucketName := bucket.GetName()
79+
feedName := opts.FeedPrefix
80+
81+
doneChan, err = client.Start(ctx)
82+
if err != nil {
83+
ErrorfCtx(ctx, "Failed to start DCP Feed %q for bucket %q: %v", feedName, MD(bucketName), err)
84+
client.Close()
85+
ErrorfCtx(ctx, "Finished calling async close error from DCP Feed %q for bucket %q: %v", feedName, MD(bucketName), err)
86+
if doneChan != nil {
87+
<-doneChan
88+
}
89+
return nil, err
90+
}
91+
InfofCtx(ctx, KeyDCP, "Started DCP Feed %q for bucket %q", feedName, MD(bucketName))
92+
go func() {
93+
select {
94+
case err := <-doneChan:
95+
WarnfCtx(ctx, "DCP Feed %q for bucket %q closed unexpectedly: %v", feedName, MD(bucketName), err)
96+
// FIXME: close dbContext here
97+
break
98+
case <-opts.Terminator:
99+
InfofCtx(ctx, KeyDCP, "Closing DCP Feed %q for bucket %q based on termination notification", feedName, MD(bucketName))
100+
client.Close()
101+
dcpCloseErr := <-doneChan
102+
if dcpCloseErr != nil {
103+
WarnfCtx(ctx, "Error on closing DCP Feed %q for %q: %v", feedName, MD(bucketName), dcpCloseErr)
104+
}
105+
break
106+
}
107+
}()
108+
return doneChan, err
109+
}

base/collection.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -263,8 +263,7 @@ func (b *GocbV2Bucket) IsSupported(feature sgbucket.BucketStoreFeature) bool {
263263
}
264264

265265
func (b *GocbV2Bucket) StartDCPFeed(ctx context.Context, args sgbucket.FeedArguments, callback sgbucket.FeedEventCallbackFunc, dbStats *expvar.Map) error {
266-
groupID := ""
267-
return StartGocbDCPFeed(ctx, b, b.Spec.BucketName, args, callback, dbStats, DCPMetadataStoreInMemory, groupID)
266+
return errors.New("GocbV2Bucket does not support StartDCPFeed; use NewDCPClient instead")
268267
}
269268

270269
func (b *GocbV2Bucket) GetStatsVbSeqno(maxVbno uint16, useAbsHighSeqNo bool) (uuids map[uint16]uint64, highSeqnos map[uint16]uint64, seqErr error) {

base/collection_common.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ package base
1212

1313
import (
1414
"errors"
15+
"slices"
1516

1617
sgbucket "github.com/couchbase/sg-bucket"
1718
)
@@ -20,6 +21,9 @@ var ErrCollectionsUnsupported = errors.New("collections not supported")
2021

2122
type ScopeAndCollectionName = sgbucket.DataStoreNameImpl
2223

24+
// CollectionNames is map of scope name to slice of collection names.
25+
type CollectionNames map[string][]string
26+
2327
func DefaultScopeAndCollectionName() ScopeAndCollectionName {
2428
return ScopeAndCollectionName{Scope: DefaultScope, Collection: DefaultCollection}
2529
}
@@ -45,3 +49,16 @@ func (s ScopeAndCollectionNames) ScopeAndCollectionNames() []string {
4549
func FullyQualifiedCollectionName(bucketName, scopeName, collectionName string) string {
4650
return bucketName + "." + scopeName + "." + collectionName
4751
}
52+
53+
// Add adds any collections to the collections. Any duplicates will be ignored.
54+
func (c CollectionNames) Add(ds ...sgbucket.DataStoreName) {
55+
for _, d := range ds {
56+
if _, ok := c[d.ScopeName()]; !ok {
57+
c[d.ScopeName()] = []string{}
58+
} else if slices.Contains(c[d.ScopeName()], d.CollectionName()) {
59+
// avoid duplicates
60+
continue
61+
}
62+
c[d.ScopeName()] = append(c[d.ScopeName()], d.CollectionName())
63+
}
64+
}

0 commit comments

Comments
 (0)