Skip to content

Commit 71ec64b

Browse files
committed
Tidy up json_documents scanner
1 parent 41b0f37 commit 71ec64b

File tree

4 files changed

+59
-48
lines changed

4 files changed

+59
-48
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ All notable changes to this project will be documented in this file.
1717
- Field `targets_input` added to the `azure_blob_storage` input.
1818
- New `reject_errored` output.
1919
- New `nats_request_reply` processor.
20+
- New `json_documents` scanner.
2021

2122
### Fixed
2223

internal/impl/pure/scanner_json.go

+12-36
Original file line numberDiff line numberDiff line change
@@ -2,55 +2,37 @@ package pure
22

33
import (
44
"context"
5-
json "encoding/json"
6-
"errors"
5+
"encoding/json"
76
"io"
87

98
"github.com/benthosdev/benthos/v4/public/service"
109
)
1110

12-
const (
13-
sjsonFieldContinueOnError = "continue_on_error"
14-
)
15-
1611
func jsonDocumentScannerSpec() *service.ConfigSpec {
1712
return service.NewConfigSpec().
1813
Stable().
14+
Version("4.27.0").
1915
Summary("Consumes a stream of one or more JSON documents.").
20-
Fields(
21-
service.NewBoolField(sjsonFieldContinueOnError).
22-
Description("If a JSON decoding fails due to any error emit an empty message marked with the error and then continue consuming subsequent documents when possible.").
23-
Default(false),
24-
)
16+
// Just a placeholder empty object as we don't have any fields yet
17+
Field(service.NewObjectField("").Default(map[string]any{}))
2518
}
2619

2720
func init() {
2821
err := service.RegisterBatchScannerCreator("json_documents", jsonDocumentScannerSpec(),
2922
func(conf *service.ParsedConfig, mgr *service.Resources) (service.BatchScannerCreator, error) {
30-
return jsonDocumentScannerFromParsed(conf)
23+
return &jsonDocumentScannerCreator{}, nil
3124
})
3225
if err != nil {
3326
panic(err)
3427
}
3528
}
3629

37-
func jsonDocumentScannerFromParsed(conf *service.ParsedConfig) (l *jsonDocumentScannerCreator, err error) {
38-
l = &jsonDocumentScannerCreator{}
39-
if l.continueOnError, err = conf.FieldBool(sjsonFieldContinueOnError); err != nil {
40-
return
41-
}
42-
return
43-
}
44-
45-
type jsonDocumentScannerCreator struct {
46-
continueOnError bool
47-
}
30+
type jsonDocumentScannerCreator struct{}
4831

4932
func (js *jsonDocumentScannerCreator) Create(rdr io.ReadCloser, aFn service.AckFunc, details *service.ScannerSourceDetails) (service.BatchScanner, error) {
5033
return service.AutoAggregateBatchScannerAcks(&jsonDocumentScanner{
51-
d: json.NewDecoder(rdr),
52-
r: rdr,
53-
continueOnError: js.continueOnError,
34+
d: json.NewDecoder(rdr),
35+
r: rdr,
5436
}, aFn), nil
5537
}
5638

@@ -61,27 +43,21 @@ func (js *jsonDocumentScannerCreator) Close(context.Context) error {
6143
type jsonDocumentScanner struct {
6244
d *json.Decoder
6345
r io.ReadCloser
64-
65-
continueOnError bool
6646
}
6747

6848
func (js *jsonDocumentScanner) NextBatch(ctx context.Context) (service.MessageBatch, error) {
6949
if js.r == nil {
7050
return nil, io.EOF
7151
}
7252

73-
msg := service.NewMessage(nil)
74-
7553
var jsonDocObj any
76-
7754
if err := js.d.Decode(&jsonDocObj); err != nil {
78-
msg.SetError(err)
79-
if errors.Is(err, io.EOF) || !js.continueOnError {
80-
return nil, err
81-
}
55+
_ = js.r.Close()
56+
js.r = nil
57+
return nil, err
8258
}
8359

84-
// Decode will determine whether a the jsonObj is of type map[string]interface{} or []interface{}
60+
msg := service.NewMessage(nil)
8561
msg.SetStructuredMut(jsonDocObj)
8662

8763
return service.MessageBatch{msg}, nil

internal/impl/pure/scanner_json_test.go

+43
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
package pure_test
22

33
import (
4+
"context"
5+
"io"
6+
"strings"
47
"testing"
58

9+
"github.com/stretchr/testify/assert"
610
"github.com/stretchr/testify/require"
711

812
"github.com/benthosdev/benthos/v4/internal/component/scanner/testutil"
@@ -34,6 +38,45 @@ test:
3438
)
3539
}
3640

41+
func TestJSONScannerBadData(t *testing.T) {
42+
confSpec := service.NewConfigSpec().Field(service.NewScannerField("test"))
43+
pConf, err := confSpec.ParseYAML(`
44+
test:
45+
json_documents: {}
46+
`, nil)
47+
require.NoError(t, err)
48+
49+
rdr, err := pConf.FieldScanner("test")
50+
require.NoError(t, err)
51+
52+
var ack error
53+
54+
scanner, err := rdr.Create(io.NopCloser(strings.NewReader(`{"a":"a0"}
55+
nope !@ not good json
56+
{"a":"a1"}
57+
`)), func(ctx context.Context, err error) error {
58+
ack = err
59+
return nil
60+
}, &service.ScannerSourceDetails{})
61+
require.NoError(t, err)
62+
63+
resBatch, aFn, err := scanner.NextBatch(context.Background())
64+
require.NoError(t, err)
65+
require.NoError(t, aFn(context.Background(), nil))
66+
require.Len(t, resBatch, 1)
67+
mBytes, err := resBatch[0].AsBytes()
68+
require.NoError(t, err)
69+
assert.Equal(t, `{"a":"a0"}`, string(mBytes))
70+
71+
_, _, err = scanner.NextBatch(context.Background())
72+
assert.Error(t, err)
73+
74+
_, _, err = scanner.NextBatch(context.Background())
75+
assert.ErrorIs(t, err, io.EOF)
76+
77+
assert.ErrorContains(t, ack, "invalid character")
78+
}
79+
3780
func TestJSONScannerFormatted(t *testing.T) {
3881
confSpec := service.NewConfigSpec().Field(service.NewScannerField("test"))
3982
pConf, err := confSpec.ParseYAML(`

website/docs/components/scanners/json_documents.md

+3-12
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,11 @@ import TabItem from '@theme/TabItem';
1616

1717
Consumes a stream of one or more JSON documents.
1818

19+
Introduced in version 4.27.0.
20+
1921
```yml
2022
# Config fields, showing default values
21-
json_documents:
22-
continue_on_error: false
23+
json_documents: {}
2324
```
2425
25-
## Fields
26-
27-
### `continue_on_error`
28-
29-
If a JSON decoding fails due to any error emit an empty message marked with the error and then continue consuming subsequent documents when possible.
30-
31-
32-
Type: `bool`
33-
Default: `false`
34-
3526

0 commit comments

Comments
 (0)