Skip to content

Commit

Permalink
Add reject_errored output
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Apr 12, 2024
1 parent 4d72f96 commit 282896f
Show file tree
Hide file tree
Showing 8 changed files with 910 additions and 35 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ All notable changes to this project will be documented in this file.
- New `retry` processor.
- New `noop` cache.
- Field `targets_input` added to the `azure_blob_storage` input.
- New `reject_errored` output.

### Fixed

Expand Down
3 changes: 2 additions & 1 deletion internal/impl/nats/metadata.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package nats

import (
"github.com/benthosdev/benthos/v4/public/service"
"github.com/nats-io/nats.go/jetstream"

"github.com/benthosdev/benthos/v4/public/service"
)

const (
Expand Down
52 changes: 45 additions & 7 deletions internal/impl/pure/output_fallback.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"

"github.com/benthosdev/benthos/v4/internal/batch"
"github.com/benthosdev/benthos/v4/internal/component"
"github.com/benthosdev/benthos/v4/internal/component/interop"
"github.com/benthosdev/benthos/v4/internal/component/output"
Expand Down Expand Up @@ -180,28 +181,65 @@ func (t *fallbackBroker) loop() {
return
}

outSorter, outBatch := message.NewSortGroup(tran.Payload)
nextBatchFromErr := func(err error) message.Batch {
var bErr *batch.Error
if len(outBatch) <= 1 || !errors.As(err, &bErr) {
tmpBatch := outBatch.ShallowCopy()
for _, m := range tmpBatch {
m.MetaSetMut("fallback_error", err.Error())
}
return tmpBatch
}

var onlyErrs message.Batch
seenIndexes := map[int]struct{}{}
bErr.WalkPartsBySource(outSorter, outBatch, func(i int, p *message.Part, err error) bool {
if err != nil && p != nil {
if _, exists := seenIndexes[i]; exists {
return true
}
seenIndexes[i] = struct{}{}
tmp := p.ShallowCopy()
tmp.MetaSetMut("fallback_error", err.Error())
onlyErrs = append(onlyErrs, tmp)
}
return true
})

// This is an edge case that means the only failed messages aren't
// capable of being associated with our origin batch. To be safe we
// fall everything through.
if len(onlyErrs) == 0 {
tmpBatch := outBatch.ShallowCopy()
for _, m := range tmpBatch {
m.MetaSetMut("fallback_error", err.Error())
}
return tmpBatch
}

outSorter, outBatch = message.NewSortGroup(onlyErrs)
return outBatch
}

i := 0
var ackFn func(ctx context.Context, err error) error
ackFn = func(ctx context.Context, err error) error {
i++
if err == nil || len(t.outputTSChans) <= i {
return tran.Ack(ctx, err)
}
newPayload := tran.Payload.ShallowCopy()
_ = newPayload.Iter(func(i int, p *message.Part) error {
p.MetaSetMut("fallback_error", err.Error())
return nil
})

select {
case t.outputTSChans[i] <- message.NewTransactionFunc(newPayload, ackFn):
case t.outputTSChans[i] <- message.NewTransactionFunc(nextBatchFromErr(err), ackFn):
case <-ctx.Done():
return ctx.Err()
}
return nil
}

select {
case t.outputTSChans[i] <- message.NewTransactionFunc(tran.Payload.ShallowCopy(), ackFn):
case t.outputTSChans[i] <- message.NewTransactionFunc(outBatch.ShallowCopy(), ackFn):
case <-t.shutSig.CloseAtLeisureChan():
return
}
Expand Down
11 changes: 3 additions & 8 deletions internal/impl/pure/output_fallback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,14 +214,9 @@ func TestFallbackHappyishPath(t *testing.T) {
resChan := make(chan error)

oTM, err := newFallbackBroker(outputs)
if err != nil {
t.Error(err)
return
}
if err = oTM.Consume(readChan); err != nil {
t.Error(err)
return
}
require.NoError(t, err)

require.NoError(t, oTM.Consume(readChan))

for i := 0; i < 10; i++ {
content := [][]byte{[]byte(fmt.Sprintf("hello world %v", i))}
Expand Down
279 changes: 279 additions & 0 deletions internal/impl/pure/output_reject_errored.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,279 @@
package pure

import (
"context"
"errors"
"fmt"

"github.com/benthosdev/benthos/v4/internal/batch"
"github.com/benthosdev/benthos/v4/internal/component"
"github.com/benthosdev/benthos/v4/internal/component/interop"
"github.com/benthosdev/benthos/v4/internal/component/output"
"github.com/benthosdev/benthos/v4/internal/message"
"github.com/benthosdev/benthos/v4/internal/shutdown"
"github.com/benthosdev/benthos/v4/public/service"
)

func init() {
err := service.RegisterBatchOutput(
"reject_errored", service.NewConfigSpec().
Stable().
Categories("Utility").
Summary(`Rejects messages that have failed their processing steps, resulting in nack behaviour at the input level, otherwise sends them to a child output.`).
Description(`
The routing of messages rejected by this output depends on the type of input it came from. For inputs that support propagating nacks upstream such as AMQP or NATS the message will be nacked. However, for inputs that are sequential such as files or Kafka the messages will simply be reprocessed from scratch.`).
Example(
"Rejecting Failed Messages",
`
The most straight forward use case for this output type is to nack messages that have failed their processing steps. In this example our mapping might fail, in which case the messages that failed are rejected and will be nacked by our input:`,
`
input:
nats_jetstream:
urls: [ nats://127.0.0.1:4222 ]
subject: foos.pending
pipeline:
processors:
- mutation: 'root.age = this.fuzzy.age.int64()'
output:
reject_errored:
nats_jetstream:
urls: [ nats://127.0.0.1:4222 ]
subject: foos.processed
`,
).
Example(
"DLQing Failed Messages",
`
Another use case for this output is to send failed messages straight into a dead-letter queue. We use it within a [fallback output](/docs/components/outputs/fallback) that allows us to specify where these failed messages should go to next.`,
`
pipeline:
processors:
- mutation: 'root.age = this.fuzzy.age.int64()'
output:
fallback:
- reject_errored:
http_client:
url: http://foo:4195/post/might/become/unreachable
retries: 3
retry_period: 1s
- http_client:
url: http://bar:4196/somewhere/else
retries: 3
retry_period: 1s
`,
).
Field(service.NewOutputField("")),
func(conf *service.ParsedConfig, mgr *service.Resources) (out service.BatchOutput, batchPolicy service.BatchPolicy, maxInFlight int, err error) {
var w *rejectErroredBroker
if w, err = newRejectErroredFromParsed(conf, mgr); err != nil {
return
}

out = interop.NewUnwrapInternalOutput(w)
return
})
if err != nil {
panic(err)
}
}

//------------------------------------------------------------------------------

func newRejectErroredFromParsed(conf *service.ParsedConfig, res *service.Resources) (*rejectErroredBroker, error) {
pOutput, err := conf.FieldOutput()
if err != nil {
return nil, err
}

output := interop.UnwrapOwnedOutput(pOutput)

var t *rejectErroredBroker
if t, err = newRejectErroredBroker(output, res); err != nil {
return nil, err
}
return t, nil
}

type rejectErroredBroker struct {
log *service.Logger

transactions <-chan message.Transaction

outputTSChan chan message.Transaction
output output.Streamed

shutSig *shutdown.Signaller
}

func newRejectErroredBroker(output output.Streamed, res *service.Resources) (*rejectErroredBroker, error) {
t := &rejectErroredBroker{
log: res.Logger(),
transactions: nil,
output: output,
shutSig: shutdown.NewSignaller(),
}
t.outputTSChan = make(chan message.Transaction)
if err := t.output.Consume(t.outputTSChan); err != nil {
return nil, err
}
return t, nil
}

//------------------------------------------------------------------------------

// Consume assigns a new messages channel for the broker to read.
func (t *rejectErroredBroker) Consume(ts <-chan message.Transaction) error {
if t.transactions != nil {
return component.ErrAlreadyStarted
}
t.transactions = ts

go t.loop()
return nil
}

// Connected returns a boolean indicating whether this output is currently
// connected to its target.
func (t *rejectErroredBroker) Connected() bool {
return t.output.Connected()
}

//------------------------------------------------------------------------------

// loop is an internal loop that brokers incoming messages to many outputs.
func (t *rejectErroredBroker) loop() {
defer func() {
close(t.outputTSChan)
t.output.TriggerCloseNow()
_ = t.output.WaitForClose(context.Background())
t.shutSig.ShutdownComplete()
}()

closeNowCtx, done := t.shutSig.CloseNowCtx(context.Background())
defer done()

for {
var open bool
var tran message.Transaction

select {
case tran, open = <-t.transactions:
if !open {
return
}
case <-t.shutSig.CloseAtLeisureChan():
return
}

if len(tran.Payload) == 1 {
// No need for pretentious batch fluffery when there's only one
// message.
if err := tran.Payload[0].ErrorGet(); err != nil {
if aerr := tran.Ack(closeNowCtx, fmt.Errorf("rejecting due to failed processing: %w", err)); aerr != nil {
t.log.With("error", aerr).Warn("Failed to nack rejected message")
}
} else {
select {
case t.outputTSChan <- tran:
case <-t.shutSig.CloseNowChan():
return
}
}
continue
}

// Check for any failed messages in the batch.
var batchErr *batch.Error
for i, m := range tran.Payload {
err := m.ErrorGet()
if err == nil {
continue
}
err = fmt.Errorf("rejecting due to failed processing: %w", err)
if batchErr == nil {
batchErr = batch.NewError(tran.Payload, err)
}
batchErr.Failed(i, err)
}

// If no messages failed we can pass the batch through unchanged.
if batchErr == nil {
select {
case t.outputTSChan <- tran:
case <-t.shutSig.CloseNowChan():
return
}
continue
}

// If all messages failed then we can nack the entire batch immediately.
if batchErr.IndexedErrors() == len(tran.Payload) {
_ = tran.Ack(closeNowCtx, batchErr)
continue
}

// If we get here it means that we have a batch of messages that mixes
// rejected and non-rejected. This is an awkward place to be because if
// a nack were to come back from the transaction we need to merge it
// into our existing batch error. For this we need a sort group.
sortGroup, sortedBatch := message.NewSortGroup(tran.Payload)

// Reduce batch down into only those we aren't rejecting.
forwardBatch := make(message.Batch, 0, len(tran.Payload)-batchErr.IndexedErrors())
batchErr.WalkPartsNaively(func(i int, _ *message.Part, err error) bool {
if err == nil {
forwardBatch = append(forwardBatch, sortedBatch[i])
}
return true
})

select {
case t.outputTSChan <- message.NewTransactionFunc(forwardBatch, func(ctx context.Context, err error) error {
if err == nil {
// An ack is simpler, we return the batch error containing our
// rejections and then move on.
return tran.Ack(ctx, batchErr)
}

var tmpBatchErr *batch.Error
if errors.As(err, &tmpBatchErr) {
tmpBatchErr.WalkPartsBySource(sortGroup, sortedBatch, func(i int, p *message.Part, err error) bool {
if err != nil {
batchErr.Failed(i, err)
}
return true
})
return tran.Ack(ctx, batchErr)
}

// If the nack returned isn't a batch error then it's batch-wide,
// this means all messages were either rejected or failed to be
// delivered.
for _, p := range forwardBatch {
if i := sortGroup.GetIndex(p); i >= 0 {
batchErr.Failed(i, err)
}
}
return tran.Ack(ctx, batchErr)
}):
case <-t.shutSig.CloseNowChan():
return
}
}
}

func (t *rejectErroredBroker) TriggerCloseNow() {
t.shutSig.CloseNow()
}

func (t *rejectErroredBroker) WaitForClose(ctx context.Context) error {
select {
case <-t.shutSig.HasClosedChan():
case <-ctx.Done():
return ctx.Err()
}
return nil
}
Loading

0 comments on commit 282896f

Please sign in to comment.