Skip to content

Commit f5cbc29

Browse files
committed
Fix docs and handle phantom messages
1 parent cfa598c commit f5cbc29

File tree

3 files changed

+129
-10
lines changed

3 files changed

+129
-10
lines changed

internal/impl/pure/processor_split.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,7 @@ func init() {
2424
Description(`
2525
This processor is for breaking batches down into smaller ones. In order to break a single message out into multiple messages use the `+"[`unarchive` processor](/docs/components/processors/unarchive)"+`.
2626
27-
If there is a remainder of messages after splitting a batch the remainder is also sent as a single batch. For example, if your target size was 10, and the processor received a batch of 95 message parts, the result would be 9 batches of 10 messages followed by a batch of 5 messages.
28-
`).
27+
If there is a remainder of messages after splitting a batch the remainder is also sent as a single batch. For example, if your target size was 10, and the processor received a batch of 95 message parts, the result would be 9 batches of 10 messages followed by a batch of 5 messages.`).
2928
Fields(
3029
service.NewIntField(splitPFieldSize).
3130
Description("The target number of messages.").

internal/pipeline/processor.go

+20-8
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,12 @@ func (p *Processor) loop() {
8585
continue
8686
}
8787

88-
var batchErrMut sync.Mutex
89-
var batchErr *batch.Error
90-
var batchWG sync.WaitGroup
88+
var (
89+
errMut sync.Mutex
90+
batchErr *batch.Error
91+
generalErr error
92+
batchWG sync.WaitGroup
93+
)
9194

9295
for _, b := range resultBatches {
9396
var wgOnce sync.Once
@@ -96,15 +99,22 @@ func (p *Processor) loop() {
9699

97100
select {
98101
case p.messagesOut <- message.NewTransactionFunc(tmpBatch, func(ctx context.Context, err error) error {
99-
batchErrMut.Lock()
100-
defer batchErrMut.Unlock()
101-
102102
if err != nil {
103+
errMut.Lock()
104+
defer errMut.Unlock()
105+
103106
if batchErr == nil {
104107
batchErr = batch.NewError(sortBatch, err)
105108
}
106109
for _, m := range tmpBatch {
107-
batchErr.Failed(sorter.GetIndex(m), err)
110+
if bIndex := sorter.GetIndex(m); bIndex >= 0 {
111+
batchErr.Failed(bIndex, err)
112+
} else {
113+
// We are unable to link this message with an origin
114+
// and therefore we must provide a general
115+
// batch-wide error instead.
116+
generalErr = err
117+
}
108118
}
109119
}
110120

@@ -120,7 +130,9 @@ func (p *Processor) loop() {
120130

121131
batchWG.Wait()
122132

123-
if batchErr != nil {
133+
if generalErr != nil {
134+
_ = tran.Ack(closeNowCtx, generalErr)
135+
} else if batchErr != nil {
124136
_ = tran.Ack(closeNowCtx, batchErr)
125137
} else {
126138
_ = tran.Ack(closeNowCtx, nil)

internal/pipeline/processor_test.go

+108
Original file line numberDiff line numberDiff line change
@@ -352,3 +352,111 @@ func TestProcessorMultiMsgsBatchError(t *testing.T) {
352352
t.Error("Expected mockproc to have waited for close")
353353
}
354354
}
355+
356+
type mockPhantomProcessor struct {
357+
hasClosedAsync bool
358+
hasWaitedForClose bool
359+
mut sync.Mutex
360+
}
361+
362+
func (m *mockPhantomProcessor) ProcessBatch(ctx context.Context, msg message.Batch) ([]message.Batch, error) {
363+
var msgs []message.Batch
364+
for _, p := range msg {
365+
tmpMsg := p.ShallowCopy()
366+
tmpMsg.SetBytes(fmt.Appendf(nil, "%s test", p.AsBytes()))
367+
msgs = append(msgs, message.Batch{tmpMsg})
368+
}
369+
msgs = append(msgs, message.Batch{
370+
message.NewPart([]byte("phantom message")),
371+
})
372+
return msgs, nil
373+
}
374+
375+
func (m *mockPhantomProcessor) Close(ctx context.Context) error {
376+
m.mut.Lock()
377+
m.hasClosedAsync = true
378+
m.hasWaitedForClose = true
379+
m.mut.Unlock()
380+
return nil
381+
}
382+
383+
func TestProcessorMultiMsgsBatchUnknownError(t *testing.T) {
384+
ctx, done := context.WithTimeout(context.Background(), time.Second*30)
385+
defer done()
386+
387+
mockProc := &mockPhantomProcessor{}
388+
proc := pipeline.NewProcessor(mockProc)
389+
390+
tChan, resChan := make(chan message.Transaction), make(chan error)
391+
392+
require.NoError(t, proc.Consume(tChan))
393+
394+
_, inputBatch := message.NewSortGroup(message.Batch{
395+
message.NewPart([]byte("foo")),
396+
message.NewPart([]byte("bar")),
397+
message.NewPart([]byte("baz")),
398+
})
399+
400+
// Send message
401+
select {
402+
case tChan <- message.NewTransaction(inputBatch, resChan):
403+
case <-time.After(time.Second):
404+
t.Error("Timed out")
405+
}
406+
407+
expMsgs := map[string]struct{}{
408+
"foo test": {},
409+
"bar test": {},
410+
"baz test": {},
411+
"phantom message": {},
412+
}
413+
414+
resFns := []func(context.Context, error) error{}
415+
416+
// Receive expected messages
417+
for i := 0; i < 4; i++ {
418+
select {
419+
case procT, open := <-proc.TransactionChan():
420+
require.True(t, open)
421+
422+
act := string(procT.Payload.Get(0).AsBytes())
423+
if _, exists := expMsgs[act]; !exists {
424+
t.Errorf("Unexpected result: %v", act)
425+
} else {
426+
delete(expMsgs, act)
427+
}
428+
resFns = append(resFns, procT.Ack)
429+
case <-time.After(time.Second):
430+
t.Error("Timed out")
431+
}
432+
}
433+
434+
assert.Empty(t, expMsgs)
435+
require.Len(t, resFns, 4)
436+
437+
require.NoError(t, resFns[0](ctx, nil))
438+
require.NoError(t, resFns[1](ctx, nil))
439+
require.NoError(t, resFns[2](ctx, nil))
440+
require.NoError(t, resFns[3](ctx, errors.New("oh no")))
441+
442+
// Receive overall ack
443+
select {
444+
case err, open := <-resChan:
445+
require.True(t, open)
446+
require.EqualError(t, err, "oh no")
447+
448+
var batchErr *batch.Error
449+
require.False(t, errors.As(err, &batchErr))
450+
case <-time.After(time.Second):
451+
t.Error("Timed out")
452+
}
453+
454+
proc.TriggerCloseNow()
455+
require.NoError(t, proc.WaitForClose(ctx))
456+
if !mockProc.hasClosedAsync {
457+
t.Error("Expected mockproc to have closed asynchronously")
458+
}
459+
if !mockProc.hasWaitedForClose {
460+
t.Error("Expected mockproc to have waited for close")
461+
}
462+
}

0 commit comments

Comments
 (0)