Skip to content

Commit cfa598c

Browse files
committed
Return tracked batch errors on nacked split batches
1 parent 3f4afe2 commit cfa598c

File tree

5 files changed

+115
-154
lines changed

5 files changed

+115
-154
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ All notable changes to this project will be documented in this file.
2626
### Changed
2727

2828
- The log events from all inputs and outputs when they first connect have been made more consistent and no longer contain any information regarding the nature of their connections.
29+
- Splitting message batches with a `split` processor (or custom plugins) no longer results in downstream error handling loops around nacks. This was previously implemented as a feature to ensure unbounded expanded and split batches don't flood downstream services in the event of a minority of errors. However, introducing more clever origin tracking of errored messages has eliminated the need for this undocumented behaviour.
2930

3031
## 4.26.0 - 2024-03-18
3132

internal/impl/pure/processor_split.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ func init() {
2424
Description(`
2525
This processor is for breaking batches down into smaller ones. In order to break a single message out into multiple messages use the `+"[`unarchive` processor](/docs/components/processors/unarchive)"+`.
2626
27-
If there is a remainder of messages after splitting a batch the remainder is also sent as a single batch. For example, if your target size was 10, and the processor received a batch of 95 message parts, the result would be 9 batches of 10 messages followed by a batch of 5 messages.`).
27+
If there is a remainder of messages after splitting a batch the remainder is also sent as a single batch. For example, if your target size was 10, and the processor received a batch of 95 message parts, the result would be 9 batches of 10 messages followed by a batch of 5 messages.
28+
`).
2829
Fields(
2930
service.NewIntField(splitPFieldSize).
3031
Description("The target number of messages.").

internal/pipeline/pool_test.go

+12-10
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package pipeline_test
22

33
import (
44
"context"
5-
"fmt"
65
"reflect"
76
"testing"
87
"time"
@@ -119,32 +118,35 @@ func TestPoolMultiMsgs(t *testing.T) {
119118
ctx, done := context.WithTimeout(context.Background(), time.Second*30)
120119
defer done()
121120

122-
mockProc := &mockMultiMsgProcessor{N: 3}
121+
mockProc := &mockSplitProcessor{}
123122

124123
proc, err := pipeline.NewPool(1, log.Noop(), mockProc)
125-
if err != nil {
126-
t.Fatal(err)
127-
}
124+
require.NoError(t, err)
128125

129126
tChan, resChan := make(chan message.Transaction), make(chan error)
130127
if err := proc.Consume(tChan); err != nil {
131128
t.Fatal(err)
132129
}
133130

134131
for j := 0; j < 10; j++ {
135-
expMsgs := map[string]struct{}{}
136-
for i := 0; i < mockProc.N; i++ {
137-
expMsgs[fmt.Sprintf("test%v", i)] = struct{}{}
132+
expMsgs := map[string]struct{}{
133+
"foo test": {},
134+
"bar test": {},
135+
"baz test": {},
138136
}
139137

140138
// Send message
141139
select {
142-
case tChan <- message.NewTransaction(message.QuickBatch(nil), resChan):
140+
case tChan <- message.NewTransaction(message.Batch{
141+
message.NewPart([]byte(`foo`)),
142+
message.NewPart([]byte(`bar`)),
143+
message.NewPart([]byte(`baz`)),
144+
}, resChan):
143145
case <-time.After(time.Second * 5):
144146
t.Fatal("Timed out")
145147
}
146148

147-
for i := 0; i < mockProc.N; i++ {
149+
for i := 0; i < 3; i++ {
148150
// Receive messages
149151
var procT message.Transaction
150152
var open bool

internal/pipeline/processor.go

+37-50
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ import (
44
"context"
55
"sync"
66

7+
"github.com/benthosdev/benthos/v4/internal/batch"
78
"github.com/benthosdev/benthos/v4/internal/component"
89
"github.com/benthosdev/benthos/v4/internal/component/processor"
910
"github.com/benthosdev/benthos/v4/internal/message"
10-
"github.com/benthosdev/benthos/v4/internal/old/util/throttle"
1111
"github.com/benthosdev/benthos/v4/internal/shutdown"
1212
)
1313

@@ -66,79 +66,66 @@ func (p *Processor) loop() {
6666
return
6767
}
6868

69-
resultMsgs, resultRes := processor.ExecuteAll(closeNowCtx, p.msgProcessors, tran.Payload)
70-
if len(resultMsgs) == 0 {
71-
if err := tran.Ack(closeNowCtx, resultRes); err != nil && closeNowCtx.Err() != nil {
69+
sorter, sortBatch := message.NewSortGroup(tran.Payload)
70+
71+
resultBatches, err := processor.ExecuteAll(closeNowCtx, p.msgProcessors, sortBatch)
72+
if len(resultBatches) == 0 || err != nil {
73+
if _ = tran.Ack(closeNowCtx, err); closeNowCtx.Err() != nil {
7274
return
7375
}
7476
continue
7577
}
7678

77-
if len(resultMsgs) > 1 {
78-
p.dispatchMessages(closeNowCtx, resultMsgs, tran.Ack)
79-
} else {
79+
if len(resultBatches) == 1 {
8080
select {
81-
case p.messagesOut <- message.NewTransactionFunc(resultMsgs[0], tran.Ack):
81+
case p.messagesOut <- message.NewTransactionFunc(resultBatches[0], tran.Ack):
8282
case <-p.shutSig.CloseNowChan():
8383
return
8484
}
85+
continue
8586
}
86-
}
87-
}
8887

89-
// dispatchMessages attempts to send a multiple messages results of processors
90-
// over the shared messages channel. This send is retried until success.
91-
func (p *Processor) dispatchMessages(ctx context.Context, msgs []message.Batch, ackFn func(context.Context, error) error) {
92-
throt := throttle.New(throttle.OptCloseChan(p.shutSig.CloseAtLeisureChan()))
88+
var batchErrMut sync.Mutex
89+
var batchErr *batch.Error
90+
var batchWG sync.WaitGroup
9391

94-
pending := msgs
95-
for len(pending) > 0 {
96-
doneChan := make(chan struct{}, len(pending))
92+
for _, b := range resultBatches {
93+
var wgOnce sync.Once
94+
batchWG.Add(1)
95+
tmpBatch := b.ShallowCopy()
9796

98-
var newPending []message.Batch
99-
var newPendingMut sync.Mutex
97+
select {
98+
case p.messagesOut <- message.NewTransactionFunc(tmpBatch, func(ctx context.Context, err error) error {
99+
batchErrMut.Lock()
100+
defer batchErrMut.Unlock()
100101

101-
for _, b := range pending {
102-
b := b
103-
transac := message.NewTransactionFunc(b.ShallowCopy(), func(ctx context.Context, err error) error {
104102
if err != nil {
105-
newPendingMut.Lock()
106-
newPending = append(newPending, b)
107-
newPendingMut.Unlock()
108-
}
109-
select {
110-
case doneChan <- struct{}{}:
111-
default:
103+
if batchErr == nil {
104+
batchErr = batch.NewError(sortBatch, err)
105+
}
106+
for _, m := range tmpBatch {
107+
batchErr.Failed(sorter.GetIndex(m), err)
108+
}
112109
}
113-
return nil
114-
})
115110

116-
select {
117-
case p.messagesOut <- transac:
118-
case <-ctx.Done():
119-
select {
120-
case doneChan <- struct{}{}:
121-
default:
122-
}
111+
wgOnce.Do(func() {
112+
batchWG.Done()
113+
})
114+
return nil
115+
}):
116+
case <-p.shutSig.CloseNowChan():
123117
return
124118
}
125119
}
126120

127-
for i := 0; i < len(pending); i++ {
128-
select {
129-
case <-doneChan:
130-
case <-ctx.Done():
131-
return
132-
}
133-
}
121+
batchWG.Wait()
134122

135-
if pending = newPending; len(pending) > 0 && !throt.Retry() {
136-
return
123+
if batchErr != nil {
124+
_ = tran.Ack(closeNowCtx, batchErr)
125+
} else {
126+
_ = tran.Ack(closeNowCtx, nil)
137127
}
138128
}
139-
140-
throt.Reset()
141-
_ = ackFn(ctx, nil)
142129
}
143130

144131
//------------------------------------------------------------------------------

0 commit comments

Comments
 (0)