Skip to content

Commit

Permalink
Merge pull request #701 from bstasyszyn/nackerror
Browse files Browse the repository at this point in the history
chore: Add error param to operation queue, 'nack' function
  • Loading branch information
fqutishat authored Jun 20, 2023
2 parents 793cbea + 0971f81 commit cca19f8
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 124 deletions.
4 changes: 2 additions & 2 deletions pkg/batch/cutter/cutter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type OperationQueue interface {
// - The operations that are to be removed.
// - The 'Ack' function that must be called to commit the remove.
// - The 'Nack' function that must be called to roll back the remove.
Remove(num uint) (ops operation.QueuedOperationsAtTime, ack func() uint, nack func(), err error)
Remove(num uint) (ops operation.QueuedOperationsAtTime, ack func() uint, nack func(error), err error)
// Peek returns (up to) the given number of operations from the head of the queue but does not remove them.
Peek(num uint) (operation.QueuedOperationsAtTime, error)
// Len returns the number of operation in the queue.
Expand All @@ -48,7 +48,7 @@ type Result struct {
// Ack commits the remove from the queue and returns the number of pending operations.
Ack func() uint
// Nack rolls back the remove so that a retry may occur.
Nack func()
Nack func(error)
}

// BatchCutter implements batch cutting.
Expand Down
3 changes: 2 additions & 1 deletion pkg/batch/cutter/cutter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
package cutter

import (
"errors"
"fmt"
"testing"

Expand Down Expand Up @@ -68,7 +69,7 @@ func TestBatchCutter(t *testing.T) {
require.Zero(t, result.Pending)
require.Equal(t, uint64(10), result.ProtocolVersion)

result.Nack()
result.Nack(errors.New("injected error"))

// After a rollback, the operations should still be in the queue
result, err = r.Cut(true)
Expand Down
4 changes: 2 additions & 2 deletions pkg/batch/opqueue/memqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (q *MemQueue) Peek(num uint) (operation.QueuedOperationsAtTime, error) {
}

// Remove removes (up to) the given number of items from the head of the queue.
func (q *MemQueue) Remove(num uint) (ops operation.QueuedOperationsAtTime, ack func() uint, nack func(), err error) {
func (q *MemQueue) Remove(num uint) (ops operation.QueuedOperationsAtTime, ack func() uint, nack func(error), err error) {
q.mutex.Lock()
defer q.mutex.Unlock()

Expand All @@ -64,7 +64,7 @@ func (q *MemQueue) Remove(num uint) (ops operation.QueuedOperationsAtTime, ack f

return uint(len(q.items))
},
func() {
func(error) {
q.mutex.Lock()
defer q.mutex.Unlock()

Expand Down
3 changes: 2 additions & 1 deletion pkg/batch/opqueue/memqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
package opqueue

import (
"errors"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -76,7 +77,7 @@ func TestMemQueue(t *testing.T) {
require.Equal(t, *op2, ops[0].QueuedOperation)
require.Equal(t, *op3, ops[1].QueuedOperation)

nack()
nack(errors.New("injected error"))

ops, ack, _, err = q.Remove(5)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/batch/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func (r *Writer) cutAndProcess(forceCut bool) (numProcessed int, pending uint, e
if err != nil {
r.logger.Error("Error processing batch operations", logfields.WithTotal(len(result.Operations)), log.WithError(err))

result.Nack()
result.Nack(err)

return 0, result.Pending + uint(len(result.Operations)), err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/batch/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ func TestProcessError(t *testing.T) {

q.LenReturns(1)
q.PeekReturns(invalidQueue, nil)
q.RemoveReturns(nil, func() uint { return 0 }, func() {}, nil)
q.RemoveReturns(nil, func() uint { return 0 }, func(error) {}, nil)

ctx := newMockContext()
ctx.ProtocolClient.Protocol.MaxOperationCount = 1
Expand Down
Loading

0 comments on commit cca19f8

Please sign in to comment.