Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions cbdatasource/cbdatasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,8 @@ type BucketDataSourceStats struct {
TotWorkerAuth uint64
TotWorkerAuthErr uint64
TotWorkerAuthFail uint64
TotWorkerSelBktFail uint64
TotWorkerSelBktOk uint64
TotWorkerAuthOk uint64
TotWorkerUPROpenErr uint64
TotWorkerUPROpenOk uint64
Expand Down Expand Up @@ -352,7 +354,8 @@ type bucketDataSource struct {
bucketName string
bucketUUID string
vbucketIDs []uint16
auth couchbase.AuthHandler
auth couchbase.AuthHandler // auth for couchbase
authMemchd couchbase.AuthHandler // auth for memcached
receiver Receiver
options *BucketDataSourceOptions

Expand Down Expand Up @@ -392,6 +395,7 @@ func NewBucketDataSource(
bucketUUID string,
vbucketIDs []uint16,
auth couchbase.AuthHandler,
authMemchd couchbase.AuthHandler,
receiver Receiver,
options *BucketDataSourceOptions) (BucketDataSource, error) {
if len(serverURLs) < 1 {
Expand All @@ -416,6 +420,7 @@ func NewBucketDataSource(
bucketUUID: bucketUUID,
vbucketIDs: vbucketIDs,
auth: auth,
authMemchd: authMemchd,
receiver: receiver,
options: options,

Expand Down Expand Up @@ -720,8 +725,8 @@ func (d *bucketDataSource) worker(server string, workerCh chan []uint16) int {
defer client.Close()
atomic.AddUint64(&d.stats.TotWorkerConnectOk, 1)

if d.auth != nil {
user, pswd, _ := d.auth.GetCredentials()
if d.authMemchd != nil {
user, pswd, _ := d.authMemchd.GetCredentials()
if user != "" {
atomic.AddUint64(&d.stats.TotWorkerAuth, 1)
res, err := client.Auth(user, pswd)
Expand All @@ -738,6 +743,13 @@ func (d *bucketDataSource) worker(server string, workerCh chan []uint16) int {
}
atomic.AddUint64(&d.stats.TotWorkerAuthOk, 1)
}
_, err = client.SelectBucket(d.bucketName)
if err != nil {
atomic.AddUint64(&d.stats.TotWorkerSelBktFail, 1)
d.receiver.OnError(fmt.Errorf("worker select bucket err: %v", err))
return 0
}
atomic.AddUint64(&d.stats.TotWorkerSelBktOk, 1)
}

uprOpenName := d.options.Name
Expand Down