From 0971f8168ebfcde2fb83b8a9dee29ef46a3fd261 Mon Sep 17 00:00:00 2001 From: Bob Stasyszyn Date: Tue, 20 Jun 2023 17:13:21 -0400 Subject: [PATCH] chore: Add error param to operation queue, 'nack' function Add an error parameter to the operation queue's Nack function to provide information on why the batch failed. Signed-off-by: Bob Stasyszyn --- pkg/batch/cutter/cutter.go | 4 +- pkg/batch/cutter/cutter_test.go | 3 +- pkg/batch/opqueue/memqueue.go | 4 +- pkg/batch/opqueue/memqueue_test.go | 3 +- pkg/batch/writer.go | 2 +- pkg/batch/writer_test.go | 2 +- pkg/mocks/operationqueue.gen.go | 285 +++++++++++++++++------------ 7 files changed, 179 insertions(+), 124 deletions(-) diff --git a/pkg/batch/cutter/cutter.go b/pkg/batch/cutter/cutter.go index 8f19accc..ca34ca5b 100644 --- a/pkg/batch/cutter/cutter.go +++ b/pkg/batch/cutter/cutter.go @@ -26,7 +26,7 @@ type OperationQueue interface { // - The operations that are to be removed. // - The 'Ack' function that must be called to commit the remove. // - The 'Nack' function that must be called to roll back the remove. - Remove(num uint) (ops operation.QueuedOperationsAtTime, ack func() uint, nack func(), err error) + Remove(num uint) (ops operation.QueuedOperationsAtTime, ack func() uint, nack func(error), err error) // Peek returns (up to) the given number of operations from the head of the queue but does not remove them. Peek(num uint) (operation.QueuedOperationsAtTime, error) // Len returns the number of operation in the queue. @@ -48,7 +48,7 @@ type Result struct { // Ack commits the remove from the queue and returns the number of pending operations. Ack func() uint // Nack rolls back the remove so that a retry may occur. - Nack func() + Nack func(error) } // BatchCutter implements batch cutting. diff --git a/pkg/batch/cutter/cutter_test.go b/pkg/batch/cutter/cutter_test.go index 4a63703d..d56e600c 100644 --- a/pkg/batch/cutter/cutter_test.go +++ b/pkg/batch/cutter/cutter_test.go @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0 package cutter import ( + "errors" "fmt" "testing" @@ -68,7 +69,7 @@ func TestBatchCutter(t *testing.T) { require.Zero(t, result.Pending) require.Equal(t, uint64(10), result.ProtocolVersion) - result.Nack() + result.Nack(errors.New("injected error")) // After a rollback, the operations should still be in the queue result, err = r.Cut(true) diff --git a/pkg/batch/opqueue/memqueue.go b/pkg/batch/opqueue/memqueue.go index 770bb62d..8cddfc50 100644 --- a/pkg/batch/opqueue/memqueue.go +++ b/pkg/batch/opqueue/memqueue.go @@ -45,7 +45,7 @@ func (q *MemQueue) Peek(num uint) (operation.QueuedOperationsAtTime, error) { } // Remove removes (up to) the given number of items from the head of the queue. -func (q *MemQueue) Remove(num uint) (ops operation.QueuedOperationsAtTime, ack func() uint, nack func(), err error) { +func (q *MemQueue) Remove(num uint) (ops operation.QueuedOperationsAtTime, ack func() uint, nack func(error), err error) { q.mutex.Lock() defer q.mutex.Unlock() @@ -64,7 +64,7 @@ func (q *MemQueue) Remove(num uint) (ops operation.QueuedOperationsAtTime, ack f return uint(len(q.items)) }, - func() { + func(error) { q.mutex.Lock() defer q.mutex.Unlock() diff --git a/pkg/batch/opqueue/memqueue_test.go b/pkg/batch/opqueue/memqueue_test.go index eb101918..891a71a8 100644 --- a/pkg/batch/opqueue/memqueue_test.go +++ b/pkg/batch/opqueue/memqueue_test.go @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0 package opqueue import ( + "errors" "testing" "github.com/stretchr/testify/require" @@ -76,7 +77,7 @@ func TestMemQueue(t *testing.T) { require.Equal(t, *op2, ops[0].QueuedOperation) require.Equal(t, *op3, ops[1].QueuedOperation) - nack() + nack(errors.New("injected error")) ops, ack, _, err = q.Remove(5) require.NoError(t, err) diff --git a/pkg/batch/writer.go b/pkg/batch/writer.go index 9e5342a9..730c2672 100644 --- a/pkg/batch/writer.go +++ b/pkg/batch/writer.go @@ -239,7 +239,7 @@ func (r *Writer) cutAndProcess(forceCut bool) (numProcessed int, pending uint, e if err != nil { r.logger.Error("Error processing batch operations", logfields.WithTotal(len(result.Operations)), log.WithError(err)) - result.Nack() + result.Nack(err) return 0, result.Pending + uint(len(result.Operations)), err } diff --git a/pkg/batch/writer_test.go b/pkg/batch/writer_test.go index 59833ba3..82d7b3b3 100644 --- a/pkg/batch/writer_test.go +++ b/pkg/batch/writer_test.go @@ -377,7 +377,7 @@ func TestProcessError(t *testing.T) { q.LenReturns(1) q.PeekReturns(invalidQueue, nil) - q.RemoveReturns(nil, func() uint { return 0 }, func() {}, nil) + q.RemoveReturns(nil, func() uint { return 0 }, func(error) {}, nil) ctx := newMockContext() ctx.ProtocolClient.Protocol.MaxOperationCount = 1 diff --git a/pkg/mocks/operationqueue.gen.go b/pkg/mocks/operationqueue.gen.go index 7a58901e..0b27bbd5 100644 --- a/pkg/mocks/operationqueue.gen.go +++ b/pkg/mocks/operationqueue.gen.go @@ -8,11 +8,11 @@ import ( ) type OperationQueue struct { - AddStub func(data *operation.QueuedOperation, protocolVersion uint64) (uint, error) + AddStub func(*operation.QueuedOperation, uint64) (uint, error) addMutex sync.RWMutex addArgsForCall []struct { - data *operation.QueuedOperation - protocolVersion uint64 + arg1 *operation.QueuedOperation + arg2 uint64 } addReturns struct { result1 uint @@ -22,27 +22,20 @@ type OperationQueue struct { result1 uint result2 error } - RemoveStub func(num uint) (ops operation.QueuedOperationsAtTime, ack func() uint, nack func(), err error) - removeMutex sync.RWMutex - removeArgsForCall []struct { - num uint + LenStub func() uint + lenMutex sync.RWMutex + lenArgsForCall []struct { } - removeReturns struct { - result1 operation.QueuedOperationsAtTime - result2 func() uint - result3 func() - result4 error + lenReturns struct { + result1 uint } - removeReturnsOnCall map[int]struct { - result1 operation.QueuedOperationsAtTime - result2 func() uint - result3 func() - result4 error + lenReturnsOnCall map[int]struct { + result1 uint } - PeekStub func(num uint) (operation.QueuedOperationsAtTime, error) + PeekStub func(uint) (operation.QueuedOperationsAtTime, error) peekMutex sync.RWMutex peekArgsForCall []struct { - num uint + arg1 uint } peekReturns struct { result1 operation.QueuedOperationsAtTime @@ -52,35 +45,45 @@ type OperationQueue struct { result1 operation.QueuedOperationsAtTime result2 error } - LenStub func() uint - lenMutex sync.RWMutex - lenArgsForCall []struct{} - lenReturns struct { - result1 uint + RemoveStub func(uint) (operation.QueuedOperationsAtTime, func() uint, func(error), error) + removeMutex sync.RWMutex + removeArgsForCall []struct { + arg1 uint } - lenReturnsOnCall map[int]struct { - result1 uint + removeReturns struct { + result1 operation.QueuedOperationsAtTime + result2 func() uint + result3 func(error) + result4 error + } + removeReturnsOnCall map[int]struct { + result1 operation.QueuedOperationsAtTime + result2 func() uint + result3 func(error) + result4 error } invocations map[string][][]interface{} invocationsMutex sync.RWMutex } -func (fake *OperationQueue) Add(data *operation.QueuedOperation, protocolVersion uint64) (uint, error) { +func (fake *OperationQueue) Add(arg1 *operation.QueuedOperation, arg2 uint64) (uint, error) { fake.addMutex.Lock() ret, specificReturn := fake.addReturnsOnCall[len(fake.addArgsForCall)] fake.addArgsForCall = append(fake.addArgsForCall, struct { - data *operation.QueuedOperation - protocolVersion uint64 - }{data, protocolVersion}) - fake.recordInvocation("Add", []interface{}{data, protocolVersion}) + arg1 *operation.QueuedOperation + arg2 uint64 + }{arg1, arg2}) + stub := fake.AddStub + fakeReturns := fake.addReturns + fake.recordInvocation("Add", []interface{}{arg1, arg2}) fake.addMutex.Unlock() - if fake.AddStub != nil { - return fake.AddStub(data, protocolVersion) + if stub != nil { + return stub(arg1, arg2) } if specificReturn { return ret.result1, ret.result2 } - return fake.addReturns.result1, fake.addReturns.result2 + return fakeReturns.result1, fakeReturns.result2 } func (fake *OperationQueue) AddCallCount() int { @@ -89,13 +92,22 @@ func (fake *OperationQueue) AddCallCount() int { return len(fake.addArgsForCall) } +func (fake *OperationQueue) AddCalls(stub func(*operation.QueuedOperation, uint64) (uint, error)) { + fake.addMutex.Lock() + defer fake.addMutex.Unlock() + fake.AddStub = stub +} + func (fake *OperationQueue) AddArgsForCall(i int) (*operation.QueuedOperation, uint64) { fake.addMutex.RLock() defer fake.addMutex.RUnlock() - return fake.addArgsForCall[i].data, fake.addArgsForCall[i].protocolVersion + argsForCall := fake.addArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 } func (fake *OperationQueue) AddReturns(result1 uint, result2 error) { + fake.addMutex.Lock() + defer fake.addMutex.Unlock() fake.AddStub = nil fake.addReturns = struct { result1 uint @@ -104,6 +116,8 @@ func (fake *OperationQueue) AddReturns(result1 uint, result2 error) { } func (fake *OperationQueue) AddReturnsOnCall(i int, result1 uint, result2 error) { + fake.addMutex.Lock() + defer fake.addMutex.Unlock() fake.AddStub = nil if fake.addReturnsOnCall == nil { fake.addReturnsOnCall = make(map[int]struct { @@ -117,78 +131,76 @@ func (fake *OperationQueue) AddReturnsOnCall(i int, result1 uint, result2 error) }{result1, result2} } -func (fake *OperationQueue) Remove(num uint) (ops operation.QueuedOperationsAtTime, ack func() uint, nack func(), err error) { - fake.removeMutex.Lock() - ret, specificReturn := fake.removeReturnsOnCall[len(fake.removeArgsForCall)] - fake.removeArgsForCall = append(fake.removeArgsForCall, struct { - num uint - }{num}) - fake.recordInvocation("Remove", []interface{}{num}) - fake.removeMutex.Unlock() - if fake.RemoveStub != nil { - return fake.RemoveStub(num) +func (fake *OperationQueue) Len() uint { + fake.lenMutex.Lock() + ret, specificReturn := fake.lenReturnsOnCall[len(fake.lenArgsForCall)] + fake.lenArgsForCall = append(fake.lenArgsForCall, struct { + }{}) + stub := fake.LenStub + fakeReturns := fake.lenReturns + fake.recordInvocation("Len", []interface{}{}) + fake.lenMutex.Unlock() + if stub != nil { + return stub() } if specificReturn { - return ret.result1, ret.result2, ret.result3, ret.result4 + return ret.result1 } - return fake.removeReturns.result1, fake.removeReturns.result2, fake.removeReturns.result3, fake.removeReturns.result4 + return fakeReturns.result1 } -func (fake *OperationQueue) RemoveCallCount() int { - fake.removeMutex.RLock() - defer fake.removeMutex.RUnlock() - return len(fake.removeArgsForCall) +func (fake *OperationQueue) LenCallCount() int { + fake.lenMutex.RLock() + defer fake.lenMutex.RUnlock() + return len(fake.lenArgsForCall) } -func (fake *OperationQueue) RemoveArgsForCall(i int) uint { - fake.removeMutex.RLock() - defer fake.removeMutex.RUnlock() - return fake.removeArgsForCall[i].num +func (fake *OperationQueue) LenCalls(stub func() uint) { + fake.lenMutex.Lock() + defer fake.lenMutex.Unlock() + fake.LenStub = stub } -func (fake *OperationQueue) RemoveReturns(result1 operation.QueuedOperationsAtTime, result2 func() uint, result3 func(), result4 error) { - fake.RemoveStub = nil - fake.removeReturns = struct { - result1 operation.QueuedOperationsAtTime - result2 func() uint - result3 func() - result4 error - }{result1, result2, result3, result4} +func (fake *OperationQueue) LenReturns(result1 uint) { + fake.lenMutex.Lock() + defer fake.lenMutex.Unlock() + fake.LenStub = nil + fake.lenReturns = struct { + result1 uint + }{result1} } -func (fake *OperationQueue) RemoveReturnsOnCall(i int, result1 operation.QueuedOperationsAtTime, result2 func() uint, result3 func(), result4 error) { - fake.RemoveStub = nil - if fake.removeReturnsOnCall == nil { - fake.removeReturnsOnCall = make(map[int]struct { - result1 operation.QueuedOperationsAtTime - result2 func() uint - result3 func() - result4 error +func (fake *OperationQueue) LenReturnsOnCall(i int, result1 uint) { + fake.lenMutex.Lock() + defer fake.lenMutex.Unlock() + fake.LenStub = nil + if fake.lenReturnsOnCall == nil { + fake.lenReturnsOnCall = make(map[int]struct { + result1 uint }) } - fake.removeReturnsOnCall[i] = struct { - result1 operation.QueuedOperationsAtTime - result2 func() uint - result3 func() - result4 error - }{result1, result2, result3, result4} + fake.lenReturnsOnCall[i] = struct { + result1 uint + }{result1} } -func (fake *OperationQueue) Peek(num uint) (operation.QueuedOperationsAtTime, error) { +func (fake *OperationQueue) Peek(arg1 uint) (operation.QueuedOperationsAtTime, error) { fake.peekMutex.Lock() ret, specificReturn := fake.peekReturnsOnCall[len(fake.peekArgsForCall)] fake.peekArgsForCall = append(fake.peekArgsForCall, struct { - num uint - }{num}) - fake.recordInvocation("Peek", []interface{}{num}) + arg1 uint + }{arg1}) + stub := fake.PeekStub + fakeReturns := fake.peekReturns + fake.recordInvocation("Peek", []interface{}{arg1}) fake.peekMutex.Unlock() - if fake.PeekStub != nil { - return fake.PeekStub(num) + if stub != nil { + return stub(arg1) } if specificReturn { return ret.result1, ret.result2 } - return fake.peekReturns.result1, fake.peekReturns.result2 + return fakeReturns.result1, fakeReturns.result2 } func (fake *OperationQueue) PeekCallCount() int { @@ -197,13 +209,22 @@ func (fake *OperationQueue) PeekCallCount() int { return len(fake.peekArgsForCall) } +func (fake *OperationQueue) PeekCalls(stub func(uint) (operation.QueuedOperationsAtTime, error)) { + fake.peekMutex.Lock() + defer fake.peekMutex.Unlock() + fake.PeekStub = stub +} + func (fake *OperationQueue) PeekArgsForCall(i int) uint { fake.peekMutex.RLock() defer fake.peekMutex.RUnlock() - return fake.peekArgsForCall[i].num + argsForCall := fake.peekArgsForCall[i] + return argsForCall.arg1 } func (fake *OperationQueue) PeekReturns(result1 operation.QueuedOperationsAtTime, result2 error) { + fake.peekMutex.Lock() + defer fake.peekMutex.Unlock() fake.PeekStub = nil fake.peekReturns = struct { result1 operation.QueuedOperationsAtTime @@ -212,6 +233,8 @@ func (fake *OperationQueue) PeekReturns(result1 operation.QueuedOperationsAtTime } func (fake *OperationQueue) PeekReturnsOnCall(i int, result1 operation.QueuedOperationsAtTime, result2 error) { + fake.peekMutex.Lock() + defer fake.peekMutex.Unlock() fake.PeekStub = nil if fake.peekReturnsOnCall == nil { fake.peekReturnsOnCall = make(map[int]struct { @@ -225,44 +248,74 @@ func (fake *OperationQueue) PeekReturnsOnCall(i int, result1 operation.QueuedOpe }{result1, result2} } -func (fake *OperationQueue) Len() uint { - fake.lenMutex.Lock() - ret, specificReturn := fake.lenReturnsOnCall[len(fake.lenArgsForCall)] - fake.lenArgsForCall = append(fake.lenArgsForCall, struct{}{}) - fake.recordInvocation("Len", []interface{}{}) - fake.lenMutex.Unlock() - if fake.LenStub != nil { - return fake.LenStub() +func (fake *OperationQueue) Remove(arg1 uint) (operation.QueuedOperationsAtTime, func() uint, func(error), error) { + fake.removeMutex.Lock() + ret, specificReturn := fake.removeReturnsOnCall[len(fake.removeArgsForCall)] + fake.removeArgsForCall = append(fake.removeArgsForCall, struct { + arg1 uint + }{arg1}) + stub := fake.RemoveStub + fakeReturns := fake.removeReturns + fake.recordInvocation("Remove", []interface{}{arg1}) + fake.removeMutex.Unlock() + if stub != nil { + return stub(arg1) } if specificReturn { - return ret.result1 + return ret.result1, ret.result2, ret.result3, ret.result4 } - return fake.lenReturns.result1 + return fakeReturns.result1, fakeReturns.result2, fakeReturns.result3, fakeReturns.result4 } -func (fake *OperationQueue) LenCallCount() int { - fake.lenMutex.RLock() - defer fake.lenMutex.RUnlock() - return len(fake.lenArgsForCall) +func (fake *OperationQueue) RemoveCallCount() int { + fake.removeMutex.RLock() + defer fake.removeMutex.RUnlock() + return len(fake.removeArgsForCall) } -func (fake *OperationQueue) LenReturns(result1 uint) { - fake.LenStub = nil - fake.lenReturns = struct { - result1 uint - }{result1} +func (fake *OperationQueue) RemoveCalls(stub func(uint) (operation.QueuedOperationsAtTime, func() uint, func(error), error)) { + fake.removeMutex.Lock() + defer fake.removeMutex.Unlock() + fake.RemoveStub = stub } -func (fake *OperationQueue) LenReturnsOnCall(i int, result1 uint) { - fake.LenStub = nil - if fake.lenReturnsOnCall == nil { - fake.lenReturnsOnCall = make(map[int]struct { - result1 uint +func (fake *OperationQueue) RemoveArgsForCall(i int) uint { + fake.removeMutex.RLock() + defer fake.removeMutex.RUnlock() + argsForCall := fake.removeArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *OperationQueue) RemoveReturns(result1 operation.QueuedOperationsAtTime, result2 func() uint, result3 func(error), result4 error) { + fake.removeMutex.Lock() + defer fake.removeMutex.Unlock() + fake.RemoveStub = nil + fake.removeReturns = struct { + result1 operation.QueuedOperationsAtTime + result2 func() uint + result3 func(error) + result4 error + }{result1, result2, result3, result4} +} + +func (fake *OperationQueue) RemoveReturnsOnCall(i int, result1 operation.QueuedOperationsAtTime, result2 func() uint, result3 func(error), result4 error) { + fake.removeMutex.Lock() + defer fake.removeMutex.Unlock() + fake.RemoveStub = nil + if fake.removeReturnsOnCall == nil { + fake.removeReturnsOnCall = make(map[int]struct { + result1 operation.QueuedOperationsAtTime + result2 func() uint + result3 func(error) + result4 error }) } - fake.lenReturnsOnCall[i] = struct { - result1 uint - }{result1} + fake.removeReturnsOnCall[i] = struct { + result1 operation.QueuedOperationsAtTime + result2 func() uint + result3 func(error) + result4 error + }{result1, result2, result3, result4} } func (fake *OperationQueue) Invocations() map[string][][]interface{} { @@ -270,12 +323,12 @@ func (fake *OperationQueue) Invocations() map[string][][]interface{} { defer fake.invocationsMutex.RUnlock() fake.addMutex.RLock() defer fake.addMutex.RUnlock() - fake.removeMutex.RLock() - defer fake.removeMutex.RUnlock() - fake.peekMutex.RLock() - defer fake.peekMutex.RUnlock() fake.lenMutex.RLock() defer fake.lenMutex.RUnlock() + fake.peekMutex.RLock() + defer fake.peekMutex.RUnlock() + fake.removeMutex.RLock() + defer fake.removeMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value