diff --git a/cbdatasource/cbdatasource.go b/cbdatasource/cbdatasource.go index d40f695..3067e7a 100644 --- a/cbdatasource/cbdatasource.go +++ b/cbdatasource/cbdatasource.go @@ -253,6 +253,8 @@ type BucketDataSourceStats struct { TotWorkerAuth uint64 TotWorkerAuthErr uint64 TotWorkerAuthFail uint64 + TotWorkerSelBktFail uint64 + TotWorkerSelBktOk uint64 TotWorkerAuthOk uint64 TotWorkerUPROpenErr uint64 TotWorkerUPROpenOk uint64 @@ -352,7 +354,8 @@ type bucketDataSource struct { bucketName string bucketUUID string vbucketIDs []uint16 - auth couchbase.AuthHandler + auth couchbase.AuthHandler // auth for couchbase + authMemchd couchbase.AuthHandler // auth for memcached receiver Receiver options *BucketDataSourceOptions @@ -392,6 +395,7 @@ func NewBucketDataSource( bucketUUID string, vbucketIDs []uint16, auth couchbase.AuthHandler, + authMemchd couchbase.AuthHandler, receiver Receiver, options *BucketDataSourceOptions) (BucketDataSource, error) { if len(serverURLs) < 1 { @@ -416,6 +420,7 @@ func NewBucketDataSource( bucketUUID: bucketUUID, vbucketIDs: vbucketIDs, auth: auth, + authMemchd: authMemchd, receiver: receiver, options: options, @@ -720,8 +725,8 @@ func (d *bucketDataSource) worker(server string, workerCh chan []uint16) int { defer client.Close() atomic.AddUint64(&d.stats.TotWorkerConnectOk, 1) - if d.auth != nil { - user, pswd, _ := d.auth.GetCredentials() + if d.authMemchd != nil { + user, pswd, _ := d.authMemchd.GetCredentials() if user != "" { atomic.AddUint64(&d.stats.TotWorkerAuth, 1) res, err := client.Auth(user, pswd) @@ -738,6 +743,13 @@ func (d *bucketDataSource) worker(server string, workerCh chan []uint16) int { } atomic.AddUint64(&d.stats.TotWorkerAuthOk, 1) } + _, err = client.SelectBucket(d.bucketName) + if err != nil { + atomic.AddUint64(&d.stats.TotWorkerSelBktFail, 1) + d.receiver.OnError(fmt.Errorf("worker select bucket err: %v", err)) + return 0 + } + atomic.AddUint64(&d.stats.TotWorkerSelBktOk, 1) } uprOpenName := d.options.Name