From ee50c4d9e9a0472d4e4be0a0eda777a1681af3f8 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Sun, 14 Sep 2025 19:59:22 -0400 Subject: [PATCH 1/2] CBG-4862 force SendRev to check for errors - Create SendRevExpectConflict to explicitly test for when a conflict is expected. --- rest/access_test.go | 15 +- rest/adminapitest/admin_api_test.go | 3 +- rest/api_test.go | 2 +- rest/attachment_test.go | 11 +- rest/blip_api_attachment_test.go | 4 +- rest/blip_api_crud_test.go | 262 ++++++------------------ rest/blip_legacy_revid_test.go | 101 +++------- rest/importtest/import_test.go | 3 +- rest/sync_fn_test.go | 3 +- rest/user_api_test.go | 6 +- rest/utilities_testing.go | 274 +++++++++++--------------- rest/utilities_testing_blip_client.go | 6 +- 12 files changed, 224 insertions(+), 466 deletions(-) diff --git a/rest/access_test.go b/rest/access_test.go index 7a93c67a85..e7e08710a3 100644 --- a/rest/access_test.go +++ b/rest/access_test.go @@ -806,15 +806,11 @@ func TestChannelAccessChanges(t *testing.T) { assert.Equal(t, "g1", changes.Results[0].ID) // Look up sequences for created docs - deltaGrantDocSeq, err := rt.SequenceForDoc("delta") - assert.NoError(t, err, "Error retrieving document sequence") - gammaGrantDocSeq, err := rt.SequenceForDoc("gamma") - assert.NoError(t, err, "Error retrieving document sequence") + deltaGrantDocSeq := rt.SequenceForDoc("delta") + gammaGrantDocSeq := rt.SequenceForDoc("gamma") - alphaDocSeq, err := rt.SequenceForDoc("a1") - assert.NoError(t, err, "Error retrieving document sequence") - gammaDocSeq, err := rt.SequenceForDoc("g1") - assert.NoError(t, err, "Error retrieving document sequence") + alphaDocSeq := rt.SequenceForDoc("a1") + gammaDocSeq := rt.SequenceForDoc("g1") // Check user access: alice, _ = a.GetUser("alice") @@ -843,8 +839,7 @@ func TestChannelAccessChanges(t *testing.T) { response = rt.SendRequest(http.MethodPut, "/{{.keyspace}}/alpha", str) RequireStatus(t, response, http.StatusCreated) - alphaGrantDocSeq, err := rt.SequenceForDoc("alpha") - assert.NoError(t, err, "Error retrieving document sequence") + alphaGrantDocSeq := rt.SequenceForDoc("alpha") // Check user access again: alice, _ = a.GetUser("alice") diff --git a/rest/adminapitest/admin_api_test.go b/rest/adminapitest/admin_api_test.go index 52683aa996..7fab479198 100644 --- a/rest/adminapitest/admin_api_test.go +++ b/rest/adminapitest/admin_api_test.go @@ -457,8 +457,7 @@ func TestDBOfflineConcurrent(t *testing.T) { wg.Done() }() - err := rest.WaitWithTimeout(&wg, time.Second*30) - assert.NoError(t, err, "Error waiting for waitgroup") + rest.WaitWithTimeout(t, &wg, time.Second*30) rest.RequireStatus(t, goroutineresponse1, http.StatusOK) rest.RequireStatus(t, goroutineresponse2, http.StatusOK) diff --git a/rest/api_test.go b/rest/api_test.go index b16eb84ba6..26314cae0f 100644 --- a/rest/api_test.go +++ b/rest/api_test.go @@ -2052,7 +2052,7 @@ func TestWebhookProperties(t *testing.T) { assert.NoError(t, err) } - require.NoError(t, WaitWithTimeout(&wg, 30*time.Second)) + WaitWithTimeout(t, &wg, 30*time.Second) } func TestWebhookSpecialProperties(t *testing.T) { diff --git a/rest/attachment_test.go b/rest/attachment_test.go index 73884e3b53..a4acf00d9f 100644 --- a/rest/attachment_test.go +++ b/rest/attachment_test.go @@ -2464,9 +2464,7 @@ func TestProveAttachmentNotFound(t *testing.T) { } // Initial set up - sent, _, _, err := bt.SendRev("doc1", "1-abc", []byte(`{"key": "val", "_attachments": {"attachment": {"data": "`+attachmentDataEncoded+`"}}}`), blip.Properties{}) - require.True(t, sent) - require.NoError(t, err) + bt.SendRev("doc1", "1-abc", []byte(`{"key": "val", "_attachments": {"attachment": {"data": "`+attachmentDataEncoded+`"}}}`), blip.Properties{}) rt.WaitForPendingChanges() @@ -2476,9 +2474,7 @@ func TestProveAttachmentNotFound(t *testing.T) { // Use different attachment name to bypass digest check in ForEachStubAttachment() which skips prove attachment code // Set attachment to V2 so it can be retrieved by RT successfully - sent, _, _, err = bt.SendRev("doc1", "2-abc", []byte(`{"key": "val", "_attachments":{"attach":{"digest":"sha1-wzp8ZyykdEuZ9GuqmxQ7XDrY7Co=","length":11,"stub":true,"revpos":1,"ver":2}}}`), blip.Properties{}) - require.True(t, sent) - require.NoError(t, err) + bt.SendRev("doc1", "2-abc", []byte(`{"key": "val", "_attachments":{"attach":{"digest":"sha1-wzp8ZyykdEuZ9GuqmxQ7XDrY7Co=","length":11,"stub":true,"revpos":1,"ver":2}}}`), blip.Properties{}) rt.WaitForPendingChanges() // Check attachment is on the document @@ -2550,8 +2546,7 @@ func TestPutInvalidAttachment(t *testing.T) { attachmentBody: test.invalidAttachmentBody, attachmentDigest: digest, } - sent, _, resp := bt.SendRevWithAttachment(input) - assert.True(t, sent) + resp := bt.SendRevWithAttachment(input) // Make sure we get the expected response back assert.Equal(t, test.expectedType, resp.Type()) diff --git a/rest/blip_api_attachment_test.go b/rest/blip_api_attachment_test.go index 091c7b3952..afae7d1152 100644 --- a/rest/blip_api_attachment_test.go +++ b/rest/blip_api_attachment_test.go @@ -472,8 +472,8 @@ func TestPutAttachmentViaBlipGetViaBlip(t *testing.T) { attachmentBody: attachmentBody, attachmentDigest: digest, } - sent, _, _ := bt.SendRevWithAttachment(input) - assert.True(t, sent) + resp := bt.SendRevWithAttachment(input) + require.NotContains(t, resp.Properties, "Error-Code") // Get all docs and attachment via subChanges request allDocs := bt.WaitForNumDocsViaChanges(1) diff --git a/rest/blip_api_crud_test.go b/rest/blip_api_crud_test.go index 70988e1819..76fad1a1d5 100644 --- a/rest/blip_api_crud_test.go +++ b/rest/blip_api_crud_test.go @@ -71,13 +71,12 @@ func TestBlipPushRevisionInspectChanges(t *testing.T) { assert.Len(t, changeRow, 0) // Should be empty, meaning the server is saying it doesn't have the revision yet // Send the doc revision in a rev request - _, _, revResponse, err := bt.SendRev( + revResponse := bt.SendRev( "foo", "1-abc", []byte(`{"key": "val"}`), blip.Properties{}, ) - assert.NoError(t, err) _, err = revResponse.Body() assert.NoError(t, err, "Error unmarshalling response body") @@ -151,8 +150,7 @@ func TestBlipPushRevisionInspectChanges(t *testing.T) { assert.Equal(t, subChangesRequest.SerialNumber(), subChangesResponse.SerialNumber()) // Wait until we got the expected callback on the "changes" profile handler - timeoutErr := WaitWithTimeout(&receivedChangesRequestWg, time.Second*5) - assert.NoError(t, timeoutErr, "Timed out waiting") + WaitWithTimeout(t, &receivedChangesRequestWg, time.Second*5) } // Start subChanges w/ continuous=true, batchsize=10 @@ -244,21 +242,16 @@ func TestContinuousChangesSubscription(t *testing.T) { for i := 1; i < 1500; i++ { // // Add a change: Send an unsolicited doc revision in a rev request receivedChangesWg.Add(1) - _, _, revResponse, err := bt.SendRev( + bt.SendRev( fmt.Sprintf("foo-%d", i), "1-abc", []byte(`{"key": "val"}`), blip.Properties{}, ) - require.NoError(t, err) - - _, err = revResponse.Body() - assert.NoError(t, err, "Error unmarshalling response body") - } // Wait until all expected changes are received by change handler - require.NoError(t, WaitWithTimeout(&receivedChangesWg, time.Second*30)) + WaitWithTimeout(t, &receivedChangesWg, time.Second*30) // Since batch size was set to 10, and 15 docs were added, expect at _least_ 2 batches numBatchesReceivedSnapshot := atomic.LoadInt32(&numbatchesReceived) @@ -290,7 +283,6 @@ func TestBlipOneShotChangesSubscription(t *testing.T) { // When this test sends subChanges, Sync Gateway will send a changes request that must be handled lastReceivedSeq := float64(0) var numbatchesReceived int32 - nonIntegerSequenceReceived := false bt.blipContext.HandlerForProfile["changes"] = func(request *blip.Message) { body, err := request.Body() @@ -313,13 +305,9 @@ func TestBlipOneShotChangesSubscription(t *testing.T) { // Make sure sequence numbers are monotonically increasing receivedSeq, ok := change[0].(float64) - if ok { - assert.True(t, receivedSeq > lastReceivedSeq) - lastReceivedSeq = receivedSeq - } else { - nonIntegerSequenceReceived = true - log.Printf("Unexpected non-integer sequence received: %v", change[0]) - } + require.True(t, ok, "Expected sequence to be float64, was %T", change[0]) + assert.True(t, receivedSeq > lastReceivedSeq) + lastReceivedSeq = receivedSeq // Verify doc id and rev id have expected vals docID := change[1].(string) @@ -354,15 +342,12 @@ func TestBlipOneShotChangesSubscription(t *testing.T) { // Add documents for docID := range docIdsReceived { // // Add a change: Send an unsolicited doc revision in a rev request - _, _, revResponse, err := bt.SendRev( + bt.SendRev( docID, "1-abc", []byte(`{"key": "val"}`), blip.Properties{}, ) - require.NoError(t, err) - _, err = revResponse.Body() - assert.NoError(t, err, "Error unmarshalling response body") receivedChangesWg.Add(1) } @@ -381,9 +366,7 @@ func TestBlipOneShotChangesSubscription(t *testing.T) { assert.Equal(t, subChangesRequest.SerialNumber(), subChangesResponse.SerialNumber()) // Wait until all expected changes are received by change handler - // receivedChangesWg.Wait() - timeoutErr := WaitWithTimeout(&receivedChangesWg, time.Second*60) - assert.NoError(t, timeoutErr, "Timed out waiting for all changes.") + WaitWithTimeout(t, &receivedChangesWg, time.Second*60) // Since batch size was set to 10, and 15 docs were added, expect at _least_ 2 batches numBatchesReceivedSnapshot := atomic.LoadInt32(&numbatchesReceived) @@ -399,30 +382,8 @@ func TestBlipOneShotChangesSubscription(t *testing.T) { // Validate that the 'caught up' message was sent assert.True(t, receivedCaughtUpChange) - // Create a few more changes, validate that they aren't sent (subChanges has been closed). - // Validated by the prefix matching in the subChanges callback, as well as waitgroup check below. - for i := 0; i < 5; i++ { - // // Add a change: Send an unsolicited doc revision in a rev request - _, _, revResponse, err := bt.SendRev( - fmt.Sprintf("postOneShot_%d", i), - "1-abc", - []byte(`{"key": "val"}`), - blip.Properties{}, - ) - require.NoError(t, err) - _, err = revResponse.Body() - assert.NoError(t, err, "Error unmarshalling response body") - receivedChangesWg.Add(1) - } - - // Wait long enough to ensure the changes aren't being sent - expectedTimeoutErr := WaitWithTimeout(&receivedChangesWg, time.Second*1) - if expectedTimeoutErr == nil { - t.Errorf("Received additional changes after one-shot should have been closed.") - } - - // Validate integer sequences - assert.False(t, nonIntegerSequenceReceived, "Unexpected non-integer sequence seen.") + require.Equal(t, int64(1), bt.restTester.GetDatabase().DbStats.CBLReplicationPullStats.NumPullReplTotalOneShot.Value()) + require.Equal(t, int64(0), bt.restTester.GetDatabase().DbStats.CBLReplicationPullStats.NumPullReplActiveOneShot.Value()) } // Test subChanges w/ docID filter @@ -453,7 +414,6 @@ func TestBlipSubChangesDocIDFilter(t *testing.T) { // When this test sends subChanges, Sync Gateway will send a changes request that must be handled lastReceivedSeq := float64(0) var numbatchesReceived int32 - nonIntegerSequenceReceived := false bt.blipContext.HandlerForProfile["changes"] = func(request *blip.Message) { @@ -477,13 +437,9 @@ func TestBlipSubChangesDocIDFilter(t *testing.T) { // Make sure sequence numbers are monotonically increasing receivedSeq, ok := change[0].(float64) - if ok { - assert.True(t, receivedSeq > lastReceivedSeq) - lastReceivedSeq = receivedSeq - } else { - nonIntegerSequenceReceived = true - log.Printf("Unexpected non-integer sequence received: %v", change[0]) - } + require.True(t, ok, "Expected sequence to be float64, was %T", change[0]) + assert.True(t, receivedSeq > lastReceivedSeq) + lastReceivedSeq = receivedSeq // Verify doc id and rev id have expected vals docID := change[1].(string) @@ -528,15 +484,12 @@ func TestBlipSubChangesDocIDFilter(t *testing.T) { // Add documents for _, docID := range docIDsSent { // // Add a change: Send an unsolicited doc revision in a rev request - _, _, revResponse, err := bt.SendRev( + bt.SendRev( docID, "1-abc", []byte(`{"key": "val"}`), blip.Properties{}, ) - assert.NoError(t, err) - _, err = revResponse.Body() - assert.NoError(t, err, "Error unmarshalling response body") } receivedChangesWg.Add(len(docIDsExpected)) @@ -565,13 +518,13 @@ func TestBlipSubChangesDocIDFilter(t *testing.T) { assert.Equal(t, subChangesRequest.SerialNumber(), subChangesResponse.SerialNumber()) // Wait until all expected changes are received by change handler - // receivedChangesWg.Wait() - timeoutErr := WaitWithTimeout(&receivedChangesWg, time.Second*15) - assert.NoError(t, timeoutErr, "Timed out waiting for all changes.") + WaitWithTimeout(t, &receivedChangesWg, time.Second*15) // Since batch size was set to 10, and 15 docs were added, expect at _least_ 2 batches numBatchesReceivedSnapshot := atomic.LoadInt32(&numbatchesReceived) assert.True(t, numBatchesReceivedSnapshot >= 2) + // Validate that the 'caught up' message was sent + assert.True(t, receivedCaughtUpChange) // Validate all expected documents were received. for docID, received := range docIDsReceived { @@ -583,8 +536,6 @@ func TestBlipSubChangesDocIDFilter(t *testing.T) { // Validate that the 'caught up' message was sent assert.True(t, receivedCaughtUpChange) - // Validate integer sequences - assert.False(t, nonIntegerSequenceReceived, "Unexpected non-integer sequence seen.") } // Push proposed changes and ensure that the server accepts them @@ -765,13 +716,12 @@ func TestPublicPortAuthentication(t *testing.T) { defer btUser1.Close() // Send the user1 doc - _, _, _, err := btUser1.SendRev( + btUser1.SendRev( "foo", "1-abc", []byte(`{"key": "val", "channels": ["user1"]}`), blip.Properties{}, ) - require.NoError(t, err, "Error sending revision") btUser2 := NewBlipTesterFromSpecWithRT(rt, &BlipTesterSpec{ connectingUsername: user2, @@ -779,13 +729,12 @@ func TestPublicPortAuthentication(t *testing.T) { defer btUser2.Close() // Send the user2 doc, which is in a "random" channel, but it should be accessible due to * channel access - _, _, _, err = btUser2.SendRev( + btUser2.SendRev( "foo2", "1-abcd", []byte(`{"key": "val", "channels": ["NBC"]}`), blip.Properties{}, ) - require.NoError(t, err, "Error sending revision") // Assert that user1 received a single expected change changesChannelUser1 := btUser1.WaitForNumChanges(1) @@ -831,13 +780,9 @@ function(doc, oldDoc) { defer bt.Close() // Attempt to send a doc, should be rejected - _, _, _, sendErr := bt.SendRev( - "foo", - "1-abc", - []byte(`{"key": "val"}`), - blip.Properties{}, - ) - assert.Error(t, sendErr, "Expected error sending rev (403 sg missing channel access)") + rq := bt.newRevMessage("foo", "1-abc", []byte(`{"key": "val"}`), blip.Properties{}) + bt.Send(rq) + require.Equal(t, "403", rq.Response().Properties["Error-Code"], "Expected 403 error code on rev send when user has no channel access, %s", rq.Properties["Error-Code"]) // Set up a ChangeWaiter for this test, to block until the user change notification happens dbc := rt.GetDatabase() @@ -858,13 +803,12 @@ function(doc, oldDoc) { require.True(t, db.WaitForUserWaiterChange(userWaiter)) // Attempt to send the doc again, should succeed if the blip context also received notification - _, _, _, sendErr = bt.SendRev( + bt.SendRev( "foo", "1-abc", []byte(`{"key": "val"}`), blip.Properties{}, ) - assert.NoError(t, sendErr) // Validate that the doc was written (GET request doesn't get a 404) getResponse := rt.SendAdminRequest("GET", "/{{.keyspace}}/foo", "") @@ -991,11 +935,8 @@ function(doc, oldDoc) { rt.WaitForPendingChanges() // Wait until all expected changes are received by change handler - timeoutErr := WaitWithTimeout(&receivedChangesWg, time.Second*5) - assert.NoError(t, timeoutErr, "Timed out waiting for all changes.") - - revTimeoutErr := WaitWithTimeout(&revsFinishedWg, time.Second*5) - assert.NoError(t, revTimeoutErr, "Timed out waiting for all revs.") + WaitWithTimeout(t, &receivedChangesWg, time.Second*5) + WaitWithTimeout(t, &revsFinishedWg, time.Second*5) assert.False(t, nonIntegerSequenceReceived, "Unexpected non-integer sequence seen.") @@ -1115,13 +1056,12 @@ function(doc, oldDoc) { // Simulate sending docs from the client receivedChangesWg.Add(100) revsFinishedWg.Add(100) - beforeChangesSent := time.Now().UnixMilli() // Sending revs may take a while if using views (GSI=false) due to the CBS views engine taking a while to execute the queries // regarding rebuilding the users access grants (due to the constant invalidation of this). // This blip tester is running as the user so the users access grants are rebuilt instantly when invalidated instead of the usual lazy-loading. for i := 0; i < 100; i++ { docID := fmt.Sprintf("foo_%d", i) - _, _, _, sendErr := bt.SendRev( + bt.SendRev( docID, "1-abc", []byte(`{"accessUser": "user1", @@ -1129,16 +1069,11 @@ function(doc, oldDoc) { "channels":["`+docID+`"]}`), blip.Properties{}, ) - require.NoError(t, sendErr) } // Wait until all expected changes are received by change handler - timeoutErr := WaitWithTimeout(&receivedChangesWg, time.Second*30) - assert.NoError(t, timeoutErr, "Timed out waiting for all changes.") - fmt.Println("Revs sent and changes received in", time.Now().UnixMilli()-beforeChangesSent, "ms") - - revTimeoutErr := WaitWithTimeout(&revsFinishedWg, time.Second*30) - assert.NoError(t, revTimeoutErr, "Timed out waiting for all revs.") + WaitWithTimeout(t, &receivedChangesWg, time.Second*30) + WaitWithTimeout(t, &revsFinishedWg, time.Second*30) assert.False(t, nonIntegerSequenceReceived, "Unexpected non-integer sequence seen.") @@ -1162,10 +1097,7 @@ func TestBlipSendAndGetRev(t *testing.T) { defer bt.Close() // Send non-deleted rev - sent, _, resp, err := bt.SendRev("sendAndGetRev", "1-abc", []byte(`{"key": "val", "channels": ["user1"]}`), blip.Properties{}) - assert.True(t, sent) - assert.NoError(t, err) - assert.Equal(t, "", resp.Properties["Error-Code"]) + bt.SendRev("sendAndGetRev", "1-abc", []byte(`{"key": "val", "channels": ["user1"]}`), blip.Properties{}) // Get non-deleted rev response := bt.restTester.SendAdminRequest("GET", "/{{.keyspace}}/sendAndGetRev?rev=1-abc", "") @@ -1177,16 +1109,14 @@ func TestBlipSendAndGetRev(t *testing.T) { // Tombstone the document history := []string{"1-abc"} - sent, _, resp, err = bt.SendRevWithHistory("sendAndGetRev", "2-bcd", history, []byte(`{"key": "val", "channels": ["user1"]}`), blip.Properties{"deleted": "true"}) - assert.True(t, sent) - assert.NoError(t, err) - assert.Equal(t, "", resp.Properties["Error-Code"]) + bt.SendRevWithHistory("sendAndGetRev", "2-bcd", history, []byte(`{"key": "val", "channels": ["user1"]}`), blip.Properties{"deleted": "true"}) // Get the tombstoned document response = bt.restTester.SendAdminRequest("GET", "/{{.keyspace}}/sendAndGetRev?rev=2-bcd", "") RequireStatus(t, response, 200) responseBody = RestDocument{} assert.NoError(t, base.JSONUnmarshal(response.Body.Bytes(), &responseBody), "Error unmarshalling GET doc response") + require.Contains(t, responseBody, db.BodyDeleted) deletedValue, deletedOK := responseBody[db.BodyDeleted].(bool) assert.True(t, deletedOK) assert.True(t, deletedValue) @@ -1222,12 +1152,11 @@ func TestBlipSendConcurrentRevs(t *testing.T) { docID := fmt.Sprintf("%s-%d", t.Name(), i) go func() { defer wg.Done() - _, _, _, err := bt.SendRev(docID, "1-abc", []byte(`{"key": "val", "channels": ["user1"]}`), blip.Properties{}) - require.NoError(t, err) + bt.SendRev(docID, "1-abc", []byte(`{"key": "val", "channels": ["user1"]}`), blip.Properties{}) }() } - require.NoError(t, WaitWithTimeout(&wg, time.Second*30)) + WaitWithTimeout(t, &wg, time.Second*30) throttleCount := rt.GetDatabase().DbStats.CBLReplicationPush().WriteThrottledCount.Value() throttleTime := rt.GetDatabase().DbStats.CBLReplicationPush().WriteThrottledTime.Value() @@ -1258,10 +1187,7 @@ func TestBlipSendAndGetLargeNumberRev(t *testing.T) { defer bt.Close() // Send non-deleted rev - sent, _, resp, err := bt.SendRev("largeNumberRev", "1-abc", []byte(`{"key": "val", "largeNumber":9223372036854775807, "channels": ["user1"]}`), blip.Properties{}) - assert.True(t, sent) - assert.NoError(t, err) - assert.Equal(t, "", resp.Properties["Error-Code"]) + bt.SendRev("largeNumberRev", "1-abc", []byte(`{"key": "val", "largeNumber":9223372036854775807, "channels": ["user1"]}`), blip.Properties{}) // Get non-deleted rev response := bt.restTester.SendAdminRequest("GET", "/{{.keyspace}}/largeNumberRev?rev=1-abc", "") @@ -1309,9 +1235,7 @@ func TestBlipSetCheckpoint(t *testing.T) { // Create new checkpoint checkpointBody := []byte(`{"client_seq":"1000"}`) - sent, _, resp, err := bt.SetCheckpoint("testclient", "", checkpointBody) - assert.True(t, sent) - assert.NoError(t, err) + resp := bt.SetCheckpoint("testclient", "", checkpointBody) assert.Equal(t, "", resp.Properties["Error-Code"]) checkpointRev := resp.Rev() @@ -1321,15 +1245,13 @@ func TestBlipSetCheckpoint(t *testing.T) { response := rt.SendAdminRequest("GET", "/{{.keyspace}}/_local/checkpoint%252Ftestclient", "") RequireStatus(t, response, 200) var responseBody map[string]interface{} - err = base.JSONUnmarshal(response.Body.Bytes(), &responseBody) + err := base.JSONUnmarshal(response.Body.Bytes(), &responseBody) require.NoError(t, err) assert.Equal(t, "1000", responseBody["client_seq"]) // Attempt to update the checkpoint with previous rev checkpointBody = []byte(`{"client_seq":"1005"}`) - sent, _, resp, err = bt.SetCheckpoint("testclient", checkpointRev, checkpointBody) - assert.True(t, sent) - assert.NoError(t, err) + resp = bt.SetCheckpoint("testclient", checkpointRev, checkpointBody) assert.Equal(t, "", resp.Properties["Error-Code"]) checkpointRev = resp.Rev() assert.Equal(t, "0-2", checkpointRev) @@ -1393,13 +1315,12 @@ func TestReloadUser(t *testing.T) { require.True(t, db.WaitForUserWaiterChange(userWaiter)) // Add a doc in the PBS channel - _, _, addRevResponse, err := bt.SendRev( + addRevResponse := bt.SendRev( "foo", "1-abc", []byte(`{"key": "val", "channels": ["PBS"]}`), blip.Properties{}, ) - assert.NoError(t, err) // Make assertions on response to make sure the change was accepted addRevResponseBody, err := addRevResponse.Body() @@ -1432,27 +1353,24 @@ func TestAccessGrantViaSyncFunction(t *testing.T) { defer bt.Close() // Add a doc in the PBS channel - _, _, _, err := bt.SendRev( + bt.SendRev( "foo", "1-abc", []byte(`{"key": "val", "channels": ["PBS"]}`), blip.Properties{}, ) - require.NoError(t, err) - // Put document that triggers access grant for user to channel PBS response := rt.SendAdminRequest("PUT", "/{{.keyspace}}/access1", `{"accessUser":"user1", "accessChannel":["PBS"]}`) RequireStatus(t, response, 201) // Add another doc in the PBS channel - _, _, _, err = bt.SendRev( + bt.SendRev( "foo2", "1-abc", []byte(`{"key": "val", "channels": ["PBS"]}`), blip.Properties{}, ) - require.NoError(t, err) // Make sure we can see it by getting changes changes := bt.WaitForNumChanges(2) @@ -1475,7 +1393,7 @@ func TestAccessGrantViaAdminApi(t *testing.T) { dataStore := bt.restTester.GetSingleDataStore() // Add a doc in the PBS channel - _, _, _, _ = bt.SendRev( + bt.SendRev( "foo", "1-abc", []byte(`{"key": "val", "channels": ["PBS"]}`), @@ -1487,7 +1405,7 @@ func TestAccessGrantViaAdminApi(t *testing.T) { RequireStatus(t, response, 200) // Add another doc in the PBS channel - _, _, _, _ = bt.SendRev( + bt.SendRev( "foo2", "1-abc", []byte(`{"key": "val", "channels": ["PBS"]}`), @@ -1655,21 +1573,11 @@ func TestPutRevNoConflictsMode(t *testing.T) { }) defer bt.Close() - sent, _, resp, err := bt.SendRev("foo", "1-abc", []byte(`{"key": "val"}`), blip.Properties{}) - assert.True(t, sent) - require.NoError(t, err) // no error - assert.Equal(t, "", resp.Properties["Error-Code"]) // no error - - sent, _, resp, err = bt.SendRev("foo", "1-def", []byte(`{"key": "val"}`), blip.Properties{"noconflicts": "true"}) - assert.True(t, sent) - assert.NotEqual(t, nil, err) // conflict error - assert.Equal(t, "409", resp.Properties["Error-Code"]) // conflict - - sent, _, resp, err = bt.SendRev("foo", "1-ghi", []byte(`{"key": "val"}`), blip.Properties{"noconflicts": "false"}) - assert.True(t, sent) - assert.NotEqual(t, nil, err) // conflict error - assert.Equal(t, "409", resp.Properties["Error-Code"]) // conflict + bt.SendRev("foo", "1-abc", []byte(`{"key": "val"}`), blip.Properties{}) + // expected failure + bt.SendRevExpectConflict("foo", "1-def", []byte(`{"key": "val"}`), blip.Properties{"noconflicts": "true"}) + bt.SendRevExpectConflict("foo", "1-ghi", []byte(`{"key": "val"}`), blip.Properties{"noconflicts": "false"}) } func TestPutRevConflictsMode(t *testing.T) { @@ -1682,21 +1590,11 @@ func TestPutRevConflictsMode(t *testing.T) { bt.restTester.GetDatabase().EnableAllowConflicts(bt.TB()) - sent, _, resp, err := bt.SendRev("foo", "1-abc", []byte(`{"key": "val"}`), blip.Properties{}) - assert.True(t, sent) - require.NoError(t, err) // no error - assert.Equal(t, "", resp.Properties["Error-Code"]) // no error + bt.SendRev("foo", "1-abc", []byte(`{"key": "val"}`), blip.Properties{}) - sent, _, resp, err = bt.SendRev("foo", "1-def", []byte(`{"key": "val"}`), blip.Properties{"noconflicts": "false"}) - assert.True(t, sent) - assert.NoError(t, err) // no error - assert.Equal(t, "", resp.Properties["Error-Code"]) // no error - - sent, _, resp, err = bt.SendRev("foo", "1-ghi", []byte(`{"key": "val"}`), blip.Properties{"noconflicts": "true"}) - assert.True(t, sent) - assert.NotEqual(t, nil, err) // conflict error - assert.Equal(t, "409", resp.Properties["Error-Code"]) // conflict + bt.SendRev("foo", "1-def", []byte(`{"key": "val"}`), blip.Properties{"noconflicts": "false"}) + bt.SendRevExpectConflict("foo", "1-ghi", []byte(`{"key": "val"}`), blip.Properties{"noconflicts": "true"}) } // TestPutRevV4: @@ -1719,10 +1617,7 @@ func TestPutRevV4(t *testing.T) { // 1. Send rev with history history := "1@b, 2@a" - sent, _, resp, err := bt.SendRev(docID, "3@c", []byte(`{"key": "val"}`), blip.Properties{"history": history}) - assert.True(t, sent) - require.NoError(t, err) - assert.Equal(t, "", resp.Properties["Error-Code"]) + bt.SendRev(docID, "3@c", []byte(`{"key": "val"}`), blip.Properties{"history": history}) // Validate against the bucket doc's HLV doc, _, err := collection.GetDocWithXattrs(ctx, docID, db.DocUnmarshalNoHistory) @@ -1735,10 +1630,7 @@ func TestPutRevV4(t *testing.T) { }, *doc.HLV) // 2. Update the document with a non-conflicting revision, where only cv is updated - sent, _, resp, err = bt.SendRev(docID, "4@c", []byte(`{"key": "val"}`), blip.Properties{"history": history}) - assert.True(t, sent) - require.NoError(t, err) - assert.Equal(t, "", resp.Properties["Error-Code"]) + bt.SendRev(docID, "4@c", []byte(`{"key": "val"}`), blip.Properties{"history": history}) // Validate against the bucket doc's HLV doc, _, err = collection.GetDocWithXattrs(ctx, docID, db.DocUnmarshalNoHistory) @@ -1752,10 +1644,7 @@ func TestPutRevV4(t *testing.T) { // 3. Update the document again with a non-conflicting revision from a different source (previous cv moved to pv) updatedHistory := "1@b, 2@a, 4@c" - sent, _, resp, err = bt.SendRev(docID, "1@d", []byte(`{"key": "val"}`), blip.Properties{"history": updatedHistory}) - assert.True(t, sent) - require.NoError(t, err) - assert.Equal(t, "", resp.Properties["Error-Code"]) + bt.SendRev(docID, "1@d", []byte(`{"key": "val"}`), blip.Properties{"history": updatedHistory}) // Validate against the bucket doc's HLV doc, _, err = collection.GetDocWithXattrs(ctx, docID, db.DocUnmarshalNoHistory) @@ -1769,10 +1658,7 @@ func TestPutRevV4(t *testing.T) { // 4. Update the document again with a non-conflicting revision from a different source, and additional sources in history (previous cv moved to pv, and pv expanded) updatedHistory = "1@b, 2@a, 4@c, 1@d" - sent, _, resp, err = bt.SendRev(docID, "1@e", []byte(`{"key": "val"}`), blip.Properties{"history": updatedHistory}) - assert.True(t, sent) - require.NoError(t, err) - assert.Equal(t, "", resp.Properties["Error-Code"]) + bt.SendRev(docID, "1@e", []byte(`{"key": "val"}`), blip.Properties{"history": updatedHistory}) // Validate against the bucket doc's HLV doc, _, err = collection.GetDocWithXattrs(ctx, docID, db.DocUnmarshalNoHistory) @@ -1785,18 +1671,12 @@ func TestPutRevV4(t *testing.T) { }, *doc.HLV) // 5. Attempt to update the document again with a conflicting revision from a different source (previous cv not in pv), expect conflict - sent, _, resp, err = bt.SendRev(docID, db.EncodeTestVersion("1@pqr"), []byte(`{"key": "val"}`), blip.Properties{"history": db.EncodeTestHistory(updatedHistory)}) - assert.True(t, sent) - require.Error(t, err) - assert.Equal(t, "409", resp.Properties["Error-Code"]) + bt.SendRevExpectConflict(docID, db.EncodeTestVersion("1@pqr"), []byte(`{"key": "val"}`), blip.Properties{"history": db.EncodeTestHistory(updatedHistory)}) // 6. Test sending rev with merge versions included in history (note new key) newDocID := t.Name() + "_2" mvHistory := "3@d, 3@e; 1@b, 2@a" - sent, _, resp, err = bt.SendRev(newDocID, "3@c", []byte(`{"key": "val"}`), blip.Properties{"history": mvHistory}) - assert.True(t, sent) - require.NoError(t, err) - assert.Equal(t, "", resp.Properties["Error-Code"]) + bt.SendRev(newDocID, "3@c", []byte(`{"key": "val"}`), blip.Properties{"history": mvHistory}) // assert on bucket doc doc, _, err = collection.GetDocWithXattrs(ctx, newDocID, db.DocUnmarshalNoHistory) @@ -1853,17 +1733,11 @@ func TestGetRemovedDoc(t *testing.T) { defer bt2.Close() // Add rev-1 in channel user1 - sent, _, resp, err := bt.SendRev("foo", "1-abc", []byte(`{"key": "val", "channels": ["user1"]}`), blip.Properties{}) - assert.True(t, sent) - require.NoError(t, err) // no error - assert.Empty(t, resp.Properties["Error-Code"]) // no error + bt.SendRev("foo", "1-abc", []byte(`{"key": "val", "channels": ["user1"]}`), blip.Properties{}) // Add rev-2 in channel user1 history := []string{"1-abc"} - sent, _, resp, err = bt.SendRevWithHistory("foo", "2-bcd", history, []byte(`{"key": "val", "channels": ["user1"]}`), blip.Properties{"noconflicts": "true"}) - assert.True(t, sent) - require.NoError(t, err) // no error - assert.Empty(t, resp.Properties["Error-Code"]) // no error + bt.SendRevWithHistory("foo", "2-bcd", history, []byte(`{"key": "val", "channels": ["user1"]}`), blip.Properties{"noconflicts": "true"}) // wait for rev 2 to arrive to cache cache rt.WaitForPendingChanges() @@ -1875,17 +1749,11 @@ func TestGetRemovedDoc(t *testing.T) { // Add rev-3, remove from channel user1 and put into channel another_channel history = []string{"2-bcd", "1-abc"} - sent, _, resp, err = bt.SendRevWithHistory("foo", "3-cde", history, []byte(`{"key": "val", "channels": ["another_channel"]}`), blip.Properties{"noconflicts": "true"}) - assert.True(t, sent) - assert.NoError(t, err) // no error - assert.Empty(t, resp.Properties["Error-Code"]) // no error + bt.SendRevWithHistory("foo", "3-cde", history, []byte(`{"key": "val", "channels": ["another_channel"]}`), blip.Properties{"noconflicts": "true"}) // Add rev-4, keeping it in channel another_channel history = []string{"3-cde", "2-bcd", "1-abc"} - sent, _, resp, err = bt.SendRevWithHistory("foo", "4-def", history, []byte("{}"), blip.Properties{"noconflicts": "true", "deleted": "true"}) - assert.True(t, sent) - assert.NoError(t, err) // no error - assert.Empty(t, resp.Properties["Error-Code"]) // no error + bt.SendRevWithHistory("foo", "4-def", history, []byte("{}"), blip.Properties{"noconflicts": "true", "deleted": "true"}) rt.WaitForPendingChanges() @@ -1930,9 +1798,7 @@ func TestMissingNoRev(t *testing.T) { for i := 0; i < 5; i++ { docID := fmt.Sprintf("doc-%d", i) docRev := fmt.Sprintf("1-abc%d", i) - sent, _, resp, err := bt.SendRev(docID, docRev, []byte(`{"key": "val", "channels": ["ABC"]}`), blip.Properties{}) - assert.True(t, sent) - require.NoError(t, err, "resp is %s", resp) + bt.SendRev(docID, docRev, []byte(`{"key": "val", "channels": ["ABC"]}`), blip.Properties{}) } // Pull docs, expect to pull 5 docs since none of them has purged yet. @@ -3349,21 +3215,19 @@ func TestPutRevBlip(t *testing.T) { bt := NewBlipTesterFromSpec(t, BlipTesterSpec{GuestEnabled: true, blipProtocols: []string{db.CBMobileReplicationV4.SubprotocolString()}}) defer bt.Close() - _, _, _, err := bt.SendRev( + bt.SendRev( "foo", "2@stZPWD8vS/O3nsx9yb2Brw", []byte(`{"key": "val"}`), blip.Properties{}, ) - require.NoError(t, err) - _, _, _, err = bt.SendRev( + bt.SendRev( "foo", "fa1@stZPWD8vS/O3nsx9yb2Brw", []byte(`{"key": "val2"}`), blip.Properties{}, ) - require.NoError(t, err) } func TestBlipMergeVersions(t *testing.T) { diff --git a/rest/blip_legacy_revid_test.go b/rest/blip_legacy_revid_test.go index aba69d4e23..885c313d5b 100644 --- a/rest/blip_legacy_revid_test.go +++ b/rest/blip_legacy_revid_test.go @@ -261,9 +261,7 @@ func TestProcessLegacyRev(t *testing.T) { // Send another rev of same doc history := []string{rev1ID} - sent, _, _, err := bt.SendRevWithHistory("doc1", "2-bcd", history, []byte(`{"key": "val"}`), blip.Properties{}) - assert.True(t, sent) - assert.NoError(t, err) + bt.SendRevWithHistory("doc1", "2-bcd", history, []byte(`{"key": "val"}`), blip.Properties{}) rt.WaitForVersion("doc1", DocVersion{RevTreeID: "2-bcd"}) // assert we can fetch this doc rev @@ -285,13 +283,12 @@ func TestProcessLegacyRev(t *testing.T) { }) // try new rev to process - _, _, _, err = bt.SendRev( + bt.SendRev( "foo", "1-abc", []byte(`{"key": "val"}`), blip.Properties{}, ) - assert.NoError(t, err) rt.WaitForVersion("foo", DocVersion{RevTreeID: "1-abc"}) // assert we can fetch this doc rev @@ -350,9 +347,7 @@ func TestProcessRevWithLegacyHistory(t *testing.T) { // Have CBL send an update to that doc, with history in revTreeID format history := []string{rev1ID} - sent, _, _, err := bt.SendRevWithHistory(docID, "1000@CBL1", history, []byte(`{"key": "val"}`), blip.Properties{}) - assert.True(t, sent) - require.NoError(t, err) + bt.SendRevWithHistory(docID, "1000@CBL1", history, []byte(`{"key": "val"}`), blip.Properties{}) // assert that the bucket doc is as expected bucketDoc, _, err := collection.GetDocWithXattrs(ctx, docID, db.DocUnmarshalAll) @@ -370,9 +365,7 @@ func TestProcessRevWithLegacyHistory(t *testing.T) { // Have CBL send an update to that doc, with history in HLV + revTreeID format history = []string{"1000@CBL2", rev1ID} - sent, _, _, err = bt.SendRevWithHistory(docID2, "1001@CBL1", history, []byte(`{"some": "update"}`), blip.Properties{}) - assert.True(t, sent) - require.NoError(t, err) + bt.SendRevWithHistory(docID2, "1001@CBL1", history, []byte(`{"some": "update"}`), blip.Properties{}) // assert that the bucket doc is as expected bucketDoc, _, err = collection.GetDocWithXattrs(ctx, docID2, db.DocUnmarshalAll) @@ -390,9 +383,7 @@ func TestProcessRevWithLegacyHistory(t *testing.T) { rt.GetDatabase().FlushRevisionCacheForTest() history = []string{"1000@CBL2", "2-abc", rev1ID} - sent, _, _, err = bt.SendRevWithHistory(docID3, "1010@CBL1", history, []byte(`{"some": "update"}`), blip.Properties{}) - assert.True(t, sent) - require.NoError(t, err) + bt.SendRevWithHistory(docID3, "1010@CBL1", history, []byte(`{"some": "update"}`), blip.Properties{}) // assert that the bucket doc is as expected bucketDoc, _, err = collection.GetDocWithXattrs(ctx, docID3, db.DocUnmarshalAll) @@ -404,9 +395,7 @@ func TestProcessRevWithLegacyHistory(t *testing.T) { // 4. CBL sends rev=1010@CBL1, history=1-abc when SGW does not have the doc (document underwent multiple legacy and p2p updates before being pushed to SGW) history = []string{"1000@CBL2", "1-abc"} - sent, _, _, err = bt.SendRevWithHistory(docID4, "1010@CBL1", history, []byte(`{"some": "update"}`), blip.Properties{}) - assert.True(t, sent) - require.NoError(t, err) + bt.SendRevWithHistory(docID4, "1010@CBL1", history, []byte(`{"some": "update"}`), blip.Properties{}) // assert that the bucket doc is as expected bucketDoc, _, err = collection.GetDocWithXattrs(ctx, docID4, db.DocUnmarshalAll) @@ -428,9 +417,7 @@ func TestProcessRevWithLegacyHistory(t *testing.T) { } history = []string{rev2ID} - sent, _, _, err = bt.SendRevWithHistory(docID5, pushedRev.String(), history, []byte(`{"some": "update"}`), blip.Properties{}) - assert.True(t, sent) - require.NoError(t, err) + bt.SendRevWithHistory(docID5, pushedRev.String(), history, []byte(`{"some": "update"}`), blip.Properties{}) // assert that the bucket doc is as expected bucketDoc, _, err = collection.GetDocWithXattrs(ctx, docID5, db.DocUnmarshalAll) @@ -453,9 +440,7 @@ func TestProcessRevWithLegacyHistory(t *testing.T) { SourceID: "CBL1", } history = []string{"3-abc", "2-abc", rev1ID} - sent, _, _, err = bt.SendRevWithHistory(docID6, pushedRev.String(), history, []byte(`{"some": "update"}`), blip.Properties{}) - assert.True(t, sent) - require.NoError(t, err) + bt.SendRevWithHistory(docID6, pushedRev.String(), history, []byte(`{"some": "update"}`), blip.Properties{}) // assert that the bucket doc is as expected bucketDoc, _, err = collection.GetDocWithXattrs(ctx, docID6, db.DocUnmarshalAll) @@ -503,10 +488,8 @@ func TestProcessRevWithLegacyHistoryConflict(t *testing.T) { require.NoError(t, ds.RemoveXattrs(base.TestCtx(t), docID, []string{base.VvXattrName}, docVersion.CV.Value)) rt.GetDatabase().FlushRevisionCacheForTest() - history := []string{rev2ID, rev1ID} - sent, _, _, err := bt.SendRevWithHistory(docID, "3-abc", history, []byte(`{"key": "val"}`), blip.Properties{}) - assert.True(t, sent) - require.ErrorContains(t, err, "Document revision conflict") + bt.SendRevExpectConflict(docID, "3-abc", []byte(`{"key": "val"}`), + blip.Properties{db.RevMessageHistory: strings.Join([]string{rev2ID, rev1ID}, ",")}) // 2. same as above but not having the rev be legacy on SGW side (don't remove the hlv) docVersion = rt.PutDoc(docID2, `{"test": "doc"}`) @@ -517,10 +500,8 @@ func TestProcessRevWithLegacyHistoryConflict(t *testing.T) { docVersion = rt.UpdateDoc(docID2, docVersion, `{"some": "update2"}`) - history = []string{rev2ID, rev1ID} - sent, _, _, err = bt.SendRevWithHistory(docID2, "3-abc", history, []byte(`{"key": "val"}`), blip.Properties{}) - assert.True(t, sent) - require.ErrorContains(t, err, "Document revision conflict") + bt.SendRevExpectConflict(docID2, "3-abc", []byte(`{"key": "val"}`), + blip.Properties{db.RevMessageHistory: strings.Join([]string{rev2ID, rev1ID}, ",")}) // 3. CBL sends rev=1010@CBL1, history=1000@CBL2,1-abc when SGW has current rev 2-abc (document underwent multiple p2p updates before being pushed to SGW) docVersion = rt.PutDoc(docID3, `{"test": "doc"}`) @@ -532,10 +513,8 @@ func TestProcessRevWithLegacyHistoryConflict(t *testing.T) { require.NoError(t, ds.RemoveXattrs(base.TestCtx(t), docID3, []string{base.VvXattrName}, docVersion.CV.Value)) rt.GetDatabase().FlushRevisionCacheForTest() - history = []string{"1000@CBL2", rev1ID} - sent, _, _, err = bt.SendRevWithHistory(docID3, "1010@CBL1", history, []byte(`{"some": "update"}`), blip.Properties{}) - assert.True(t, sent) - require.ErrorContains(t, err, "Document revision conflict") + bt.SendRevExpectConflict(docID3, "1010@CBL1", []byte(`{"key": "val"}`), + blip.Properties{db.RevMessageHistory: strings.Join([]string{"1000@CBL2", rev1ID}, ",")}) } // TestChangesResponseLegacyRev: @@ -630,11 +609,8 @@ func TestChangesResponseLegacyRev(t *testing.T) { subChangesResponse := subChangesRequest.Response() assert.Equal(t, subChangesRequest.SerialNumber(), subChangesResponse.SerialNumber()) - timeoutErr := WaitWithTimeout(&receivedChangesRequestWg, time.Second*10) - require.NoError(t, timeoutErr, "Timed out waiting") - - timeoutErr = WaitWithTimeout(&revsFinishedWg, time.Second*10) - require.NoError(t, timeoutErr, "Timed out waiting") + WaitWithTimeout(t, &receivedChangesRequestWg, time.Second*10) + WaitWithTimeout(t, &revsFinishedWg, time.Second*10) } @@ -737,11 +713,8 @@ func TestChangesResponseWithHLVInHistory(t *testing.T) { subChangesResponse := subChangesRequest.Response() assert.Equal(t, subChangesRequest.SerialNumber(), subChangesResponse.SerialNumber()) - timeoutErr := WaitWithTimeout(&receivedChangesRequestWg, time.Second*10) - require.NoError(t, timeoutErr, "Timed out waiting") - - timeoutErr = WaitWithTimeout(&revsFinishedWg, time.Second*10) - require.NoError(t, timeoutErr, "Timed out waiting") + WaitWithTimeout(t, &receivedChangesRequestWg, time.Second*10) + WaitWithTimeout(t, &revsFinishedWg, time.Second*10) } // TestCBLHasPreUpgradeMutationThatHasNotBeenReplicated: @@ -767,9 +740,7 @@ func TestCBLHasPreUpgradeMutationThatHasNotBeenReplicated(t *testing.T) { rt.GetDatabase().FlushRevisionCacheForTest() history := []string{rev1ID} - sent, _, _, err := bt.SendRevWithHistory("doc1", "2-abc", history, []byte(`{"key": "val"}`), blip.Properties{}) - assert.True(t, sent) - require.NoError(t, err) + bt.SendRevWithHistory("doc1", "2-abc", history, []byte(`{"key": "val"}`), blip.Properties{}) // assert that the bucket doc is as expected bucketDoc, _, err := collection.GetDocWithXattrs(ctx, "doc1", db.DocUnmarshalAll) @@ -806,9 +777,7 @@ func TestCBLHasOfPreUpgradeMutationThatSGWAlreadyKnows(t *testing.T) { rt.GetDatabase().FlushRevisionCacheForTest() history := []string{rev1ID} - sent, _, _, err := bt.SendRevWithHistory("doc1", rev2ID, history, []byte(`{"key": "val"}`), blip.Properties{}) - assert.True(t, sent) - require.NoError(t, err) + bt.SendRevWithHistory("doc1", rev2ID, history, []byte(`{"key": "val"}`), blip.Properties{}) // assert that the bucket doc is as expected bucketDoc, _, err := collection.GetDocWithXattrs(ctx, "doc1", db.DocUnmarshalAll) @@ -844,9 +813,7 @@ func TestPushOfPostUpgradeMutationThatHasCommonAncestorToSGWVersion(t *testing.T rt.GetDatabase().FlushRevisionCacheForTest() // send 100@CBL1 - sent, _, _, err := bt.SendRevWithHistory("doc1", "100@CBL1", nil, []byte(`{"key": "val"}`), blip.Properties{}) - assert.True(t, sent) - require.NoError(t, err) + bt.SendRevWithHistory("doc1", "100@CBL1", nil, []byte(`{"key": "val"}`), blip.Properties{}) bucketDoc, _, err := collection.GetDocWithXattrs(ctx, "doc1", db.DocUnmarshalAll) require.NoError(t, err) @@ -886,9 +853,8 @@ func TestPushDocConflictBetweenPreUpgradeCBLMutationAndPreUpgradeSGWMutation(t * // send rev 3-def history := []string{rev2ID, rev1ID} - sent, _, _, err := bt.SendRevWithHistory("doc1", "3-def", history, []byte(`{"key": "val"}`), blip.Properties{}) - assert.True(t, sent) - require.ErrorContains(t, err, "Document revision conflict") + bt.SendRevExpectConflict("doc1", "3-def", []byte(`{"key": "val"}`), blip.Properties{db.RevMessageHistory: strings.Join(history, ",")}) + //require.ErrorContains(t, err, "Document revision conflict") // assert that the bucket doc is as expected bucketDoc, _, err := collection.GetDocWithXattrs(ctx, "doc1", db.DocUnmarshalAll) @@ -923,9 +889,7 @@ func TestPushDocConflictBetweenPreUpgradeCBLMutationAndPostUpgradeSGWMutation(t // send rev 3-def history := []string{rev2ID, rev1ID} - sent, _, _, err := bt.SendRevWithHistory("doc1", "3-def", history, []byte(`{"key": "val"}`), blip.Properties{}) - assert.True(t, sent) - require.ErrorContains(t, err, "Document revision conflict") + bt.SendRevExpectConflict("doc1", "3-def", []byte(`{"key": "val"}`), blip.Properties{db.RevMessageHistory: strings.Join(history, ",")}) // assert that the bucket doc is as expected bucketDoc, _, err := collection.GetDocWithXattrs(ctx, "doc1", db.DocUnmarshalAll) @@ -959,9 +923,7 @@ func TestConflictBetweenPostUpgradeCBLMutationAndPostUpgradeSGWMutation(t *testi rev1ID := docVersion.RevTreeID history := []string{rev1ID} - sent, _, _, err := bt.SendRevWithHistory(docID, "100@CBL1", history, []byte(`{"key": "val"}`), blip.Properties{}) - assert.True(t, sent) - require.NoError(t, err) + bt.SendRevWithHistory(docID, "100@CBL1", history, []byte(`{"key": "val"}`), blip.Properties{}) // assert that the bucket doc is as expected bucketDoc, _, err := collection.GetDocWithXattrs(ctx, docID, db.DocUnmarshalAll) @@ -974,10 +936,7 @@ func TestConflictBetweenPostUpgradeCBLMutationAndPostUpgradeSGWMutation(t *testi docVersion = rt.PutDoc(docID2, `{"some": "doc"}`) rev1ID = docVersion.RevTreeID - history = []string{"1-abc"} - sent, _, _, err = bt.SendRevWithHistory(docID2, "100@CBL1", history, []byte(`{"key": "val"}`), blip.Properties{}) - assert.True(t, sent) - require.ErrorContains(t, err, "Document revision conflict") + bt.SendRevExpectConflict(docID2, "100@CBL1", []byte(`{"key": "val"}`), blip.Properties{db.RevMessageHistory: "1-abc"}) // assert that the bucket doc is as expected bucketDoc, _, err = collection.GetDocWithXattrs(ctx, docID2, db.DocUnmarshalAll) @@ -1003,14 +962,10 @@ func TestLegacyRevNotInConflict(t *testing.T) { // have two history entries, 1 rev from a different CBL and 1 legacy rev, should generate conflict history := []string{"1-CBL2", "1-abc"} - sent, _, _, err := bt.SendRevWithHistory(docID, "100@CBL1", history, []byte(`{"key": "val"}`), blip.Properties{}) - assert.True(t, sent) - require.ErrorContains(t, err, "Document revision conflict") + bt.SendRevExpectConflict(docID, "100@CBL1", []byte(`{"key": "val"}`), blip.Properties{db.RevMessageHistory: strings.Join(history, ",")}) history = []string{docVersion.CV.String(), "1-abc"} - sent, _, _, err = bt.SendRevWithHistory(docID, "100@CBL1", history, []byte(`{"key": "val"}`), blip.Properties{}) - assert.True(t, sent) - require.NoError(t, err) + bt.SendRev(docID, "100@CBL1", []byte(`{"key": "val"}`), blip.Properties{db.RevMessageHistory: strings.Join(history, ",")}) // assert that the bucket doc is as expected bucketDoc, _, err := collection.GetDocWithXattrs(ctx, docID, db.DocUnmarshalAll) diff --git a/rest/importtest/import_test.go b/rest/importtest/import_test.go index 6b551ba741..c2bbb4034c 100644 --- a/rest/importtest/import_test.go +++ b/rest/importtest/import_test.go @@ -2136,8 +2136,7 @@ func TestImportFilterTimeout(t *testing.T) { rest.AssertStatus(t, response, 404) syncFnFinishedWG.Done() }() - timeoutErr := rest.WaitWithTimeout(&syncFnFinishedWG, time.Second*15) - assert.NoError(t, timeoutErr) + rest.WaitWithTimeout(t, &syncFnFinishedWG, time.Second*15) } func TestImportRollback(t *testing.T) { diff --git a/rest/sync_fn_test.go b/rest/sync_fn_test.go index 80e872a9ef..026ff4268a 100644 --- a/rest/sync_fn_test.go +++ b/rest/sync_fn_test.go @@ -401,8 +401,7 @@ func TestSyncFnTimeout(t *testing.T) { AssertHTTPErrorReason(t, response, 500, "JS sync function timed out") syncFnFinishedWG.Done() }() - timeoutErr := WaitWithTimeout(&syncFnFinishedWG, time.Second*15) - assert.NoError(t, timeoutErr) + WaitWithTimeout(t, &syncFnFinishedWG, time.Second*15) } func TestResyncErrorScenariosUsingDCPStream(t *testing.T) { diff --git a/rest/user_api_test.go b/rest/user_api_test.go index 2d301b4141..b672b4588e 100644 --- a/rest/user_api_test.go +++ b/rest/user_api_test.go @@ -859,8 +859,7 @@ function(doc, oldDoc) { log.Printf("Invoking _changes?feed=continuous&since=%s&timeout=2000", since) changesResponse := rt.SendUserRequest("GET", fmt.Sprintf("/{{.keyspace}}/_changes?feed=continuous&since=%s&timeout=2000", since), "", "bernard") - changes, err := rt.ReadContinuousChanges(changesResponse) - assert.NoError(t, err) + changes := rt.ReadContinuousChanges(changesResponse) changesAccumulated = append(changesAccumulated, changes...) @@ -943,8 +942,7 @@ func TestUserDeleteDuringChangesWithAccess(t *testing.T) { } else { // case 2 - ensure no error processing the changes response. The number of entries may vary, depending // on whether the changes loop performed an additional iteration before catching the deleted user. - _, err := rt.ReadContinuousChanges(changesResponse) - assert.NoError(t, err) + rt.ReadContinuousChanges(changesResponse) } }() diff --git a/rest/utilities_testing.go b/rest/utilities_testing.go index 326a09a946..2e2fa2585c 100644 --- a/rest/utilities_testing.go +++ b/rest/utilities_testing.go @@ -18,6 +18,7 @@ import ( "fmt" "io" "log" + "maps" "net" "net/http" "net/http/httptest" @@ -182,12 +183,12 @@ func newRestTester(tb testing.TB, restConfig *RestTesterConfig, collectionConfig return &rt } -// NewRestTester returns a rest tester backed by a single database and a single default collection. +// NewRestTesterDefaultCollection creates a rest tester backed by a single database and a single _default._deafult collection. func NewRestTesterDefaultCollection(tb testing.TB, restConfig *RestTesterConfig) *RestTester { return newRestTester(tb, restConfig, useSingleCollectionDefaultOnly, 1) } -// NewRestTester multiple collections a rest tester backed by a single database and any number of collections and the names of the keyspaces of collections created. +// NewRestTesterMultipleCollections creates rest tester backed by a single database and any number of collections and the names of the keyspaces of collections created. func NewRestTesterMultipleCollections(tb testing.TB, restConfig *RestTesterConfig, numCollections int) *RestTester { if !base.TestsUseNamedCollections() { tb.Skip("This test requires named collections and is running against a bucket type that does not support them") @@ -298,9 +299,7 @@ func (rt *RestTester) Bucket() base.Bucket { sc.Unsupported.UserQueries = base.Ptr(rt.EnableUserQueries) // Allow EE-only config even in CE for testing using group IDs. - if err := sc.Validate(base.TestCtx(rt.TB()), true); err != nil { - panic("invalid RestTester StartupConfig: " + err.Error()) - } + require.NoError(rt.TB(), sc.Validate(base.TestCtx(rt.TB()), true)) // Post-validation, we can lower the bcrypt cost beyond SG limits to reduce test runtime. sc.Auth.BcryptCost = bcrypt.MinCost @@ -594,27 +593,27 @@ func (rt *RestTester) GetSingleDataStore() base.DataStore { return ds } +// WaitForDoc will wait for the specific docID to be available in the change cache by comparing the sequence number in the bucket to latest sequence processed by channel cache. Consider replacing with WaitForPendingChanges. func (rt *RestTester) WaitForDoc(docid string) { - seq, err := rt.SequenceForDoc(docid) - require.NoError(rt.TB(), err, "Error getting sequence for doc %s", docid) + seq := rt.SequenceForDoc(docid) rt.WaitForSequence(seq) } -func (rt *RestTester) SequenceForDoc(docid string) (seq uint64, err error) { +// SequenceForDoc returns the current sequence for a document from the bucket, failing the test if the document doesn't exist. +func (rt *RestTester) SequenceForDoc(docid string) (seq uint64) { collection, ctx := rt.GetSingleTestDatabaseCollection() doc, err := collection.GetDocument(ctx, docid, db.DocUnmarshalAll) - if err != nil { - return 0, err - } - return doc.Sequence, nil + require.NoError(rt.TB(), err, "Error getting doc %q", docid) + return doc.Sequence } -// Wait for sequence to be buffered by the channel cache +// WaitForSequence waits for the sequence to be buffered by the channel cache func (rt *RestTester) WaitForSequence(seq uint64) { collection, ctx := rt.GetSingleTestDatabaseCollection() require.NoError(rt.TB(), collection.WaitForSequence(ctx, seq)) } +// WaitForPendingChanges waits all outstanding changes to be buffered by the channel cache. func (rt *RestTester) WaitForPendingChanges() { ctx := rt.Context() for _, collection := range rt.GetDbCollections() { @@ -1593,7 +1592,8 @@ func (bt *BlipTester) addCollectionProperty(msg *blip.Message) *blip.Message { return msg } -func (bt *BlipTester) SetCheckpoint(client string, checkpointRev string, body []byte) (sent bool, req *db.SetCheckpointMessage, res *db.SetCheckpointResponse, err error) { +// SetCheckpoint sends a setCheckpoint message with the checkpoint docID and a specific and returns the response. Blocks waiting for the response, and checks error status. +func (bt *BlipTester) SetCheckpoint(client string, checkpointRev string, body []byte) *db.SetCheckpointResponse { scm := db.NewSetCheckpointMessage() scm.SetCompressed(true) @@ -1602,60 +1602,76 @@ func (bt *BlipTester) SetCheckpoint(client string, checkpointRev string, body [] scm.SetBody(body) bt.addCollectionProperty(scm.Message) - sent = bt.sender.Send(scm.Message) - if !sent { - return sent, scm, nil, fmt.Errorf("Failed to send setCheckpoint for client: %v", client) - } - - scr := &db.SetCheckpointResponse{Message: scm.Response()} - return true, scm, scr, nil + require.True(bt.TB(), bt.sender.Send(scm.Message)) + resp := scm.Response() + body, err := resp.Body() + require.NoError(bt.TB(), err) + require.NotContains(bt.TB(), resp.Properties, "Error-Code", "Error in response to setCheckpoint request. Properties:%v Body:%s", resp.Properties, body) + return &db.SetCheckpointResponse{Message: resp} } -// The docHistory should be in the same format as expected by db.PutExistingRevWithBody(), or empty if this is the first revision -func (bt *BlipTester) SendRevWithHistory(docId, docRev string, revHistory []string, body []byte, properties blip.Properties) (sent bool, req, res *blip.Message, err error) { +// newRevMessage constructs a rev message configured with the current collection and default set of parameters. properties will overwrite any default properties set by this function. +func (bt *BlipTester) newRevMessage(docID, docRev string, body []byte, properties blip.Properties) *blip.Message { revRequest := blip.NewRequest() revRequest.SetCompressed(true) revRequest.SetProfile("rev") - revRequest.Properties["id"] = docId + revRequest.Properties["id"] = docID revRequest.Properties["rev"] = docRev revRequest.Properties["deleted"] = "false" - if len(revHistory) > 0 { - revRequest.Properties["history"] = strings.Join(revHistory, ",") - } - // Override any properties which have been supplied explicitly - for k, v := range properties { - revRequest.Properties[k] = v - } + maps.Copy(revRequest.Properties, properties) bt.addCollectionProperty(revRequest) - revRequest.SetBody(body) - sent = bt.sender.Send(revRequest) - if !sent { - return sent, revRequest, nil, fmt.Errorf("Failed to send revRequest for doc: %v", docId) - } - revResponse := revRequest.Response() - if revResponse.SerialNumber() != revRequest.SerialNumber() { - return sent, revRequest, revResponse, fmt.Errorf("revResponse.SerialNumber() != revRequest.SerialNumber(). %v != %v", revResponse.SerialNumber(), revRequest.SerialNumber()) - } + return revRequest +} - // Make sure no errors. Just panic for now, but if there are tests that expect errors and want - // to use SendRev(), this could be returned. - if errorCode, ok := revResponse.Properties["Error-Code"]; ok { - body, _ := revResponse.Body() - return sent, revRequest, revResponse, fmt.Errorf("Unexpected error sending rev: %v\n%s", errorCode, body) +// SendRevWithHistory sends an unsolicited rev message and waits for the response. The docHistory should be in the same format as expected by db.PutExistingRevWithBody(), or empty if this is the first revision +func (bt *BlipTester) SendRevWithHistory(docID, docRev string, revHistory []string, body []byte, properties blip.Properties) (res *blip.Message) { + require.NotContains(bt.TB(), properties, "history", "If specifying history, use BlipTester.SendRev") + if len(revHistory) > 0 { + properties[db.RevMessageHistory] = strings.Join(revHistory, ",") } + return bt.SendRev(docID, docRev, body, properties) +} - return sent, revRequest, revResponse, nil - +// SendRev sends an unsolicited rev message and waits for the response. The docHistory should be in the same format as expected by db.PutExistingRevWithBody(), or empty if this is the first revision +func (bt *BlipTester) SendRev(docID, docRev string, body []byte, properties blip.Properties) (res *blip.Message) { + revRequest := bt.newRevMessage(docID, docRev, body, properties) + bt.Send(revRequest) + revResponse := revRequest.Response() + rspBody, err := revResponse.Body() + require.NoError(bt.TB(), err) + require.Empty(bt.TB(), revResponse.Properties["Error-Code"], "Error in response to rev request. Properties:%v Body:%s", revResponse.Properties, rspBody) + return revResponse } -func (bt *BlipTester) SendRev(docId, docRev string, body []byte, properties blip.Properties) (sent bool, req, res *blip.Message, err error) { +// SendRevExpectConflict sends an unsolicited rev message and waits for the response, expecting a conflict error (409). The docHistory should be in the same format as expected by db.PutExistingRevWithBody(), or empty if this is the first revision +func (bt *BlipTester) SendRevExpectConflict(docID, docRev string, body []byte, properties blip.Properties) (res *blip.Message) { + revRequest := bt.newRevMessage(docID, docRev, body, properties) + bt.Send(revRequest) + revResponse := revRequest.Response() + rspBody, err := revResponse.Body() + require.NoError(bt.TB(), err) + require.Equal(bt.TB(), "409", revResponse.Properties["Error-Code"], "Expected conflict in response to rev request. Properties:%v Body:%s", revResponse.Properties, rspBody) + return revResponse +} - return bt.SendRevWithHistory(docId, docRev, []string{}, body, properties) +// Send a blip message but do not wait for a response. +func (bt *BlipTester) Send(rq *blip.Message) { + require.True(bt.TB(), bt.sender.Send(rq)) +} +// Run is equivalent to testing.T.Run() but updates the underlying RestTester's TB. +func (bt *BlipTester) Run(name string, test func(*testing.T)) { + mainT := bt.restTester.TB().(*testing.T) + mainT.Run(name, func(t *testing.T) { + var tb testing.TB = t + old := bt.restTester.testingTB.Swap(&tb) + defer func() { bt.restTester.testingTB.Store(old) }() + test(t) + }) } // PrincipalConfigForWrite is used by GetUserPayload, GetRolePayload to remove the omitempty for ExplicitRoleNames @@ -1723,14 +1739,13 @@ func addChannelsToPrincipal(config PrincipalConfigForWrite, ds sgbucket.DataStor return payload, nil } -func getChangesHandler(changesFinishedWg, revsFinishedWg *sync.WaitGroup) func(request *blip.Message) { +// getChangesHandler returns a changes handler which will respond to all changes messages and ask to be set rev or norev messages. +func getChangesHandler(t testing.TB, changesFinishedWg, revsFinishedWg *sync.WaitGroup) func(request *blip.Message) { return func(request *blip.Message) { // Send a response telling the other side we want ALL revisions body, err := request.Body() - if err != nil { - panic(fmt.Sprintf("Error getting request body: %v", err)) - } + require.NoError(t, err, "Error getting request body") if string(body) == "null" { changesFinishedWg.Done() @@ -1742,9 +1757,7 @@ func getChangesHandler(changesFinishedWg, revsFinishedWg *sync.WaitGroup) func(r // unmarshal into json array changesBatch := [][]interface{}{} - if err := base.JSONUnmarshal(body, &changesBatch); err != nil { - panic(fmt.Sprintf("Error unmarshalling changes. Body: %vs. Error: %v", string(body), err)) - } + require.NoError(t, base.JSONUnmarshal(body, &changesBatch)) responseVal := [][]interface{}{} for _, change := range changesBatch { @@ -1756,16 +1769,14 @@ func getChangesHandler(changesFinishedWg, revsFinishedWg *sync.WaitGroup) func(r response := request.Response() responseValBytes, err := base.JSONMarshal(responseVal) log.Printf("responseValBytes: %s", responseValBytes) - if err != nil { - panic(fmt.Sprintf("Error marshalling response: %v", err)) - } + require.NoError(t, err, "Error marshalling response") response.SetBody(responseValBytes) } } } -// Get a doc at a particular revision from Sync Gateway. +// GetDocAtRev sets up blip handlers to get a doc at a particular revision from Sync Gateway. Consider using BlipTesterClient to do this behavior. // // Warning: this can only be called from a single goroutine, given the fact it registers profile handlers. // @@ -1793,7 +1804,7 @@ func (bt *BlipTester) GetDocAtRev(requestedDocID, requestedDocRev string) (resul }() // -------- Changes handler callback -------- - bt.blipContext.HandlerForProfile["changes"] = getChangesHandler(&changesFinishedWg, &revsFinishedWg) + bt.blipContext.HandlerForProfile["changes"] = getChangesHandler(bt.TB(), &changesFinishedWg, &revsFinishedWg) // -------- Norev handler callback -------- bt.blipContext.HandlerForProfile["norev"] = func(request *blip.Message) { @@ -1808,14 +1819,9 @@ func (bt *BlipTester) GetDocAtRev(requestedDocID, requestedDocRev string) (resul defer revsFinishedWg.Done() body, err := request.Body() - if err != nil { - panic(fmt.Sprintf("Unexpected err getting request body: %v", err)) - } + require.NoError(bt.TB(), err, "Error getting request body") var doc RestDocument - err = base.JSONUnmarshal(body, &doc) - if err != nil { - panic(fmt.Sprintf("Unexpected err: %v", err)) - } + require.NoError(bt.TB(), base.JSONUnmarshal(body, &doc)) docId := request.Properties["id"] docRev := request.Properties["rev"] doc.SetID(docId) @@ -1834,16 +1840,11 @@ func (bt *BlipTester) GetDocAtRev(requestedDocID, requestedDocRev string) (resul subChangesRequest.Properties["continuous"] = "false" bt.addCollectionProperty(subChangesRequest) - sent := bt.sender.Send(subChangesRequest) - if !sent { - panic("Unable to subscribe to changes.") - } - - require.NoError(bt.TB(), WaitWithTimeout(&changesFinishedWg, time.Second*30)) - require.NoError(bt.TB(), WaitWithTimeout(&revsFinishedWg, time.Second*30)) + bt.Send(subChangesRequest) + WaitWithTimeout(bt.TB(), &changesFinishedWg, time.Second*30) + WaitWithTimeout(bt.TB(), &revsFinishedWg, time.Second*30) return resultDoc, resultErr - } type SendRevWithAttachmentInput struct { @@ -1857,8 +1858,9 @@ type SendRevWithAttachmentInput struct { body []byte } +// SendRevWithAttachment will send a single rev message and block until the attachments are returned. The rev message is returned and must be checked for errors. // Warning: this can only be called from a single goroutine, given the fact it registers profile handlers. -func (bt *BlipTester) SendRevWithAttachment(input SendRevWithAttachmentInput) (sent bool, req, res *blip.Message) { +func (bt *BlipTester) SendRevWithAttachment(input SendRevWithAttachmentInput) (res *blip.Message) { defer func() { // Clean up all profile handlers that are registered as part of this test @@ -1876,10 +1878,7 @@ func (bt *BlipTester) SendRevWithAttachment(input SendRevWithAttachmentInput) (s doc := NewRestDocument() if len(input.body) > 0 { - unmarshalErr := json.Unmarshal(input.body, &doc) - if unmarshalErr != nil { - panic(fmt.Sprintf("Error unmarshalling body into restDocument. Error: %v", unmarshalErr)) - } + require.NoError(bt.TB(), json.Unmarshal(input.body, &doc)) } doc.SetAttachments(db.AttachmentMap{ @@ -1887,37 +1886,32 @@ func (bt *BlipTester) SendRevWithAttachment(input SendRevWithAttachmentInput) (s }) docBody, err := base.JSONMarshal(doc) - if err != nil { - panic(fmt.Sprintf("Error marshalling doc. Error: %v", err)) - } + require.NoError(bt.TB(), err, "Error marshalling doc") getAttachmentWg := sync.WaitGroup{} bt.blipContext.HandlerForProfile["getAttachment"] = func(request *blip.Message) { defer getAttachmentWg.Done() - if request.Properties["digest"] != myAttachment.Digest { - panic(fmt.Sprintf("Unexpected digest. Got: %v, expected: %v", request.Properties["digest"], myAttachment.Digest)) - } + require.Equal(bt.TB(), myAttachment.Digest, request.Properties["digest"]) response := request.Response() response.SetBody([]byte(input.attachmentBody)) } // Push a rev with an attachment. getAttachmentWg.Add(1) - sent, req, res, _ = bt.SendRevWithHistory( - input.docId, - input.revId, - input.history, - docBody, - blip.Properties{}, - ) + rq := bt.newRevMessage(input.docId, input.revId, docBody, blip.Properties{ + db.RevMessageHistory: strings.Join(input.history, ","), + }) + bt.Send(rq) // Expect a callback to the getAttachment endpoint getAttachmentWg.Wait() - return sent, req, res - + return rq.Response() } +// WaitForNumChanges waits for at least the number of document changes and returns the changes as they are in the changes messages: +// +// [[sequence, docID, revID, deleted], [sequence, docID, revID, deleted]] func (bt *BlipTester) WaitForNumChanges(numChangesExpected int) (changes [][]interface{}) { retryWorker := func() (shouldRetry bool, err error, value [][]any) { @@ -1958,9 +1952,7 @@ func (bt *BlipTester) GetChanges() (changes [][]interface{}) { for changeMsg := range chanChanges { body, err := changeMsg.Body() - if err != nil { - panic(fmt.Sprintf("Error getting request body: %v", err)) - } + require.NoError(bt.TB(), err, "Error getting request body") if string(body) == "null" { // the other side indicated that it's done sending changes. @@ -1972,9 +1964,7 @@ func (bt *BlipTester) GetChanges() (changes [][]interface{}) { // unmarshal into json array changesBatch := [][]interface{}{} - if err := base.JSONUnmarshal(body, &changesBatch); err != nil { - panic(fmt.Sprintf("Error unmarshalling changes. Body: %vs. Error: %v", string(body), err)) - } + require.NoError(bt.TB(), base.JSONUnmarshal(body, &changesBatch), "Error unmarshalling changes. Body: %v", string(body)) collectedChanges = append(collectedChanges, changesBatch...) @@ -1994,7 +1984,7 @@ func (bt *BlipTester) WaitForNumDocsViaChanges(numDocsExpected int) (docs map[st return docs } -// Get all documents and their attachments via the following steps: +// PullDocs gets all documents and their attachments via the following steps: // // - Invoking one-shot subChanges request // - Responding to all incoming "changes" requests from peer to request the changed rev, and accumulate rev body @@ -2020,21 +2010,17 @@ func (bt *BlipTester) PullDocs() (docs map[string]RestDocument) { // -------- Changes handler callback -------- // When this test sends subChanges, Sync Gateway will send a changes request that must be handled - bt.blipContext.HandlerForProfile["changes"] = getChangesHandler(&changesFinishedWg, &revsFinishedWg) + bt.blipContext.HandlerForProfile["changes"] = getChangesHandler(bt.TB(), &changesFinishedWg, &revsFinishedWg) // -------- Rev handler callback -------- bt.blipContext.HandlerForProfile["rev"] = func(request *blip.Message) { defer revsFinishedWg.Done() body, err := request.Body() - if err != nil { - panic(fmt.Sprintf("Unexpected err getting request body: %v", err)) - } + require.NoError(bt.TB(), err) + var doc RestDocument - err = base.JSONUnmarshal(body, &doc) - if err != nil { - panic(fmt.Sprintf("Unexpected err: %v", err)) - } + require.NoError(bt.TB(), base.JSONUnmarshal(body, &doc)) docId := request.Properties["id"] docRev := request.Properties["rev"] doc.SetID(docId) @@ -2045,9 +2031,7 @@ func (bt *BlipTester) PullDocs() (docs map[string]RestDocument) { docsLock.Unlock() attachments, err := doc.GetAttachments() - if err != nil { - panic(fmt.Sprintf("Unexpected err: %v", err)) - } + require.NoError(bt.TB(), err) for _, attachment := range attachments { @@ -2059,16 +2043,10 @@ func (bt *BlipTester) PullDocs() (docs map[string]RestDocument) { getAttachmentRequest.Properties[db.GetAttachmentID] = docId } bt.addCollectionProperty(getAttachmentRequest) - sent := bt.sender.Send(getAttachmentRequest) - if !sent { - panic("Unable to get attachment.") - } + bt.Send(getAttachmentRequest) getAttachmentResponse := getAttachmentRequest.Response() getAttachmentBody, getAttachmentErr := getAttachmentResponse.Body() - if getAttachmentErr != nil { - panic(fmt.Sprintf("Unexpected err: %v", err)) - } - log.Printf("getAttachmentBody: %s", getAttachmentBody) + require.NoError(bt.TB(), getAttachmentErr, "Error getting attachment body") attachment.Data = getAttachmentBody } @@ -2095,10 +2073,7 @@ func (bt *BlipTester) PullDocs() (docs map[string]RestDocument) { subChangesRequest.Properties["continuous"] = "false" bt.addCollectionProperty(subChangesRequest) - sent := bt.sender.Send(subChangesRequest) - if !sent { - panic("Unable to subscribe to changes.") - } + bt.Send(subChangesRequest) changesFinishedWg.Wait() @@ -2120,9 +2095,7 @@ func (bt *BlipTester) SubscribeToChanges(continuous bool, changes chan<- *blip.M response := request.Response() emptyResponseVal := []interface{}{} emptyResponseValBytes, err := base.JSONMarshal(emptyResponseVal) - if err != nil { - panic(fmt.Sprintf("Error marshalling response: %v", err)) - } + require.NoError(bt.TB(), err) response.SetBody(emptyResponseValBytes) } @@ -2139,19 +2112,10 @@ func (bt *BlipTester) SubscribeToChanges(continuous bool, changes chan<- *blip.M subChangesRequest.Properties["continuous"] = "false" } - sent := bt.sender.Send(subChangesRequest) - if !sent { - panic("Unable to subscribe to changes.") - } + bt.Send(subChangesRequest) subChangesResponse := subChangesRequest.Response() - if subChangesResponse.SerialNumber() != subChangesRequest.SerialNumber() { - panic(fmt.Sprintf("subChangesResponse.SerialNumber() != subChangesRequest.SerialNumber(). %v != %v", subChangesResponse.SerialNumber(), subChangesRequest.SerialNumber())) - } - errCode := subChangesResponse.Properties[db.BlipErrorCode] - if errCode != "" { - bt.restTester.TB().Fatalf("Error sending subChanges request: %s", errCode) - } - + require.Equal(bt.TB(), subChangesResponse.SerialNumber(), subChangesRequest.SerialNumber()) + require.NotContains(bt.TB(), subChangesResponse.Properties, db.BlipErrorCode, "Error in response to subChanges request. Properties:%v", subChangesResponse.Properties) } // Helper for comparing BLIP changes received with expected BLIP changes @@ -2293,8 +2257,8 @@ func (d RestDocument) IsRemoved() bool { return removed.(bool) } -// Wait for the WaitGroup, or return an error if the wg.Wait() doesn't return within timeout -func WaitWithTimeout(wg *sync.WaitGroup, timeout time.Duration) error { +// WaitWithTimeout calls for the WaitGroup.Wait() and fails the test if the Wait does not return within the timeout. +func WaitWithTimeout(t testing.TB, wg *sync.WaitGroup, timeout time.Duration) { // Create a channel so that a goroutine waiting on the waitgroup can send it's result (if any) wgFinished := make(chan bool) @@ -2308,11 +2272,10 @@ func WaitWithTimeout(wg *sync.WaitGroup, timeout time.Duration) error { defer timer.Stop() select { case <-wgFinished: - return nil + return case <-timer.C: - return fmt.Errorf("Timed out waiting after %v", timeout) + require.FailNow(t, fmt.Sprintf("Timed out waiting after %.2f sec", timeout.Seconds())) } - } // NewHTTPTestServerOnListener returns a new httptest server, which is configured to listen on the given listener. @@ -2552,8 +2515,8 @@ func (rt *RestTester) getCollectionsForBLIP() []string { return collections } -// Reads continuous changes feed response into slice of ChangeEntry -func (rt *RestTester) ReadContinuousChanges(response *TestResponse) ([]db.ChangeEntry, error) { +// ReadContinuousChanges reads the output continuous changes feed rest response into slice of ChangeEntry +func (rt *RestTester) ReadContinuousChanges(response *TestResponse) []db.ChangeEntry { var change db.ChangeEntry changes := make([]db.ChangeEntry, 0) reader := bufio.NewReader(response.Body) @@ -2563,30 +2526,23 @@ func (rt *RestTester) ReadContinuousChanges(response *TestResponse) ([]db.Change // done break } - if readError != nil { - // unexpected read error - return changes, readError - } + require.NoError(rt.TB(), readError) entry = bytes.TrimSpace(entry) if len(entry) > 0 { - err := base.JSONUnmarshal(entry, &change) - if err != nil { - return changes, err - } + require.NoError(rt.TB(), base.JSONUnmarshal(entry, &change)) changes = append(changes, change) log.Printf("Got change ==> %v", change) } } - return changes, nil + return changes } // RequireContinuousFeedChangesCount Calls a changes feed on every collection and asserts that the nth expected change is // the number of changes for the nth collection. func (rt *RestTester) RequireContinuousFeedChangesCount(t testing.TB, username string, keyspace int, expectedChanges int, timeout int) { resp := rt.SendUserRequest("GET", fmt.Sprintf("/{{.keyspace%d}}/_changes?feed=continuous&timeout=%d", keyspace, timeout), "", username) - changes, err := rt.ReadContinuousChanges(resp) - assert.NoError(t, err) + changes := rt.ReadContinuousChanges(resp) require.Len(t, changes, expectedChanges) } diff --git a/rest/utilities_testing_blip_client.go b/rest/utilities_testing_blip_client.go index a3e0a9f554..881566f52a 100644 --- a/rest/utilities_testing_blip_client.go +++ b/rest/utilities_testing_blip_client.go @@ -1295,10 +1295,8 @@ func (btcc *BlipTesterCollectionClient) StartPushWithOpts(opts BlipTesterPushOpt go func() { defer func() { waitTime := time.Second * 5 - if assert.NoError(btcc.TB(), WaitWithTimeout(&btcc.pushGoroutineWg, waitTime), - "timed out waiting for push replication goroutines to finish after %v", waitTime) { - btcc.pushRunning.Set(false) - } + WaitWithTimeout(btcc.TB(), &btcc.pushGoroutineWg, waitTime) + btcc.pushRunning.Set(false) }() defer btcc.pushGoroutineWg.Done() // TODO: CBG-4401 wire up opts.changesBatchSize and implement a flush timeout for when the client doesn't fill the batch From 211e9769ea822e14a2633d8d935f21fbcc55d589 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Tue, 16 Sep 2025 12:43:12 -0400 Subject: [PATCH 2/2] fix spelling --- rest/utilities_testing.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rest/utilities_testing.go b/rest/utilities_testing.go index 2e2fa2585c..1ad8c6b157 100644 --- a/rest/utilities_testing.go +++ b/rest/utilities_testing.go @@ -183,7 +183,7 @@ func newRestTester(tb testing.TB, restConfig *RestTesterConfig, collectionConfig return &rt } -// NewRestTesterDefaultCollection creates a rest tester backed by a single database and a single _default._deafult collection. +// NewRestTesterDefaultCollection creates a rest tester backed by a single database and a single _default._default collection. func NewRestTesterDefaultCollection(tb testing.TB, restConfig *RestTesterConfig) *RestTester { return newRestTester(tb, restConfig, useSingleCollectionDefaultOnly, 1) }