Skip to content

Commit 3a7dfc1

Browse files
committed
Remove use of AWS mocks (prepping for v2)
1 parent b16f01a commit 3a7dfc1

20 files changed

+65
-50
lines changed

internal/impl/aws/cache_dynamodb.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"github.com/aws/aws-sdk-go/aws"
1111
"github.com/aws/aws-sdk-go/aws/awserr"
1212
"github.com/aws/aws-sdk-go/service/dynamodb"
13-
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
1413
"github.com/aws/aws-sdk-go/service/dynamodb/expression"
1514
"github.com/cenkalti/backoff/v4"
1615

@@ -127,7 +126,7 @@ func newDynamodbCacheFromConfig(conf *service.ParsedConfig) (*dynamodbCache, err
127126
//------------------------------------------------------------------------------
128127

129128
type dynamodbCache struct {
130-
client dynamodbiface.DynamoDBAPI
129+
client dynamoDBAPI
131130

132131
table *string
133132
hashKey string
@@ -140,7 +139,7 @@ type dynamodbCache struct {
140139
}
141140

142141
func newDynamodbCache(
143-
client dynamodbiface.DynamoDBAPI,
142+
client dynamoDBAPI,
144143
table, hashKey, dataKey string,
145144
consistentRead bool,
146145
ttlKey *string, ttl *time.Duration,

internal/impl/aws/cache_s3.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"github.com/aws/aws-sdk-go/aws"
1111
"github.com/aws/aws-sdk-go/aws/awserr"
1212
"github.com/aws/aws-sdk-go/service/s3"
13-
"github.com/aws/aws-sdk-go/service/s3/s3iface"
1413
"github.com/cenkalti/backoff/v4"
1514

1615
"github.com/benthosdev/benthos/v4/internal/impl/aws/config"
@@ -94,15 +93,15 @@ func newS3CacheFromConfig(conf *service.ParsedConfig) (*s3Cache, error) {
9493
//------------------------------------------------------------------------------
9594

9695
type s3Cache struct {
97-
s3 s3iface.S3API
96+
s3 *s3.S3
9897

9998
bucket string
10099
contentType string
101100

102101
boffPool sync.Pool
103102
}
104103

105-
func newS3Cache(bucket, contentType string, backOff *backoff.ExponentialBackOff, s3 s3iface.S3API) *s3Cache {
104+
func newS3Cache(bucket, contentType string, backOff *backoff.ExponentialBackOff, s3 *s3.S3) *s3Cache {
106105
return &s3Cache{
107106
s3: s3,
108107

internal/impl/aws/input_kinesis.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
"github.com/aws/aws-sdk-go/aws/awserr"
1414
"github.com/aws/aws-sdk-go/aws/session"
1515
"github.com/aws/aws-sdk-go/service/kinesis"
16-
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
1716
"github.com/cenkalti/backoff/v4"
1817
"github.com/gofrs/uuid"
1918

@@ -181,7 +180,7 @@ type kinesisReader struct {
181180

182181
boffPool sync.Pool
183182

184-
svc kinesisiface.KinesisAPI
183+
svc *kinesis.Kinesis
185184
checkpointer *awsKinesisCheckpointer
186185

187186
streamShards map[string][]string

internal/impl/aws/input_kinesis_checkpointer.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"github.com/aws/aws-sdk-go/aws/awserr"
1313
"github.com/aws/aws-sdk-go/aws/session"
1414
"github.com/aws/aws-sdk-go/service/dynamodb"
15-
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
1615

1716
"github.com/benthosdev/benthos/v4/public/service"
1817
)
@@ -57,7 +56,7 @@ type awsKinesisCheckpointer struct {
5756
clientID string
5857
leaseDuration time.Duration
5958
commitPeriod time.Duration
60-
svc dynamodbiface.DynamoDBAPI
59+
svc *dynamodb.DynamoDB
6160
}
6261

6362
// newAWSKinesisCheckpointer creates a new DynamoDB checkpointer from an AWS

internal/impl/aws/input_sqs.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"github.com/aws/aws-sdk-go/aws/request"
1212
"github.com/aws/aws-sdk-go/aws/session"
1313
"github.com/aws/aws-sdk-go/service/sqs"
14-
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
1514
"github.com/cenkalti/backoff/v4"
1615

1716
"github.com/benthosdev/benthos/v4/internal/component"
@@ -130,11 +129,18 @@ func init() {
130129

131130
//------------------------------------------------------------------------------
132131

132+
type sqsAPI interface {
133+
ReceiveMessageWithContext(aws.Context, *sqs.ReceiveMessageInput, ...request.Option) (*sqs.ReceiveMessageOutput, error)
134+
DeleteMessageBatchWithContext(aws.Context, *sqs.DeleteMessageBatchInput, ...request.Option) (*sqs.DeleteMessageBatchOutput, error)
135+
ChangeMessageVisibilityBatchWithContext(aws.Context, *sqs.ChangeMessageVisibilityBatchInput, ...request.Option) (*sqs.ChangeMessageVisibilityBatchOutput, error)
136+
SendMessageBatchWithContext(aws.Context, *sqs.SendMessageBatchInput, ...request.Option) (*sqs.SendMessageBatchOutput, error)
137+
}
138+
133139
type awsSQSReader struct {
134140
conf sqsiConfig
135141

136142
session *session.Session
137-
sqs sqsiface.SQSAPI
143+
sqs sqsAPI
138144

139145
messagesChan chan *sqs.Message
140146
ackMessagesChan chan sqsMessageHandle

internal/impl/aws/input_sqs_test.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,11 @@ import (
1111
"github.com/aws/aws-sdk-go/aws/request"
1212
"github.com/aws/aws-sdk-go/aws/session"
1313
"github.com/aws/aws-sdk-go/service/sqs"
14-
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
1514
"github.com/stretchr/testify/require"
1615
)
1716

1817
type mockSqsInput struct {
19-
sqsiface.SQSAPI
18+
sqsAPI
2019

2120
mtx chan struct{}
2221
queueTimeout int

internal/impl/aws/metrics_cloudwatch.go

+6-3
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"github.com/aws/aws-sdk-go/aws/request"
1212
"github.com/aws/aws-sdk-go/aws/session"
1313
"github.com/aws/aws-sdk-go/service/cloudwatch"
14-
"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
1514

1615
"github.com/benthosdev/benthos/v4/internal/component/metrics"
1716
"github.com/benthosdev/benthos/v4/internal/impl/aws/config"
@@ -278,8 +277,12 @@ func (c *cloudWatchGaugeVec) With(labelValues ...string) metrics.StatGauge {
278277

279278
//------------------------------------------------------------------------------
280279

280+
type cloudWatchAPI interface {
281+
PutMetricDataWithContext(aws.Context, *cloudwatch.PutMetricDataInput, ...request.Option) (*cloudwatch.PutMetricDataOutput, error)
282+
}
283+
281284
type cwMetrics struct {
282-
client cloudwatchiface.CloudWatchAPI
285+
client cloudWatchAPI
283286

284287
datumses map[string]*cloudWatchDatum
285288
datumLock *sync.Mutex
@@ -485,7 +488,7 @@ func (c *cwMetrics) flush() error {
485488
}
486489
throttled = false
487490

488-
if _, err := c.client.PutMetricData(&input); err != nil {
491+
if _, err := c.client.PutMetricDataWithContext(context.Background(), &input); err != nil {
489492
if request.IsErrorThrottle(err) {
490493
throttled = true
491494
c.log.Warn("Metrics request was throttled. Either increase flush period or reduce number of services sending metrics.")

internal/impl/aws/metrics_cloudwatch_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,19 @@ import (
77
"testing"
88
"time"
99

10+
"github.com/aws/aws-sdk-go/aws"
11+
"github.com/aws/aws-sdk-go/aws/request"
1012
"github.com/aws/aws-sdk-go/service/cloudwatch"
11-
"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
1213
"github.com/stretchr/testify/assert"
1314
)
1415

1516
type mockCloudWatchClient struct {
16-
cloudwatchiface.CloudWatchAPI
1717
errs []error
1818

1919
inputs []cloudwatch.PutMetricDataInput
2020
}
2121

22-
func cwmMock(svc cloudwatchiface.CloudWatchAPI) *cwMetrics {
22+
func cwmMock(svc cloudWatchAPI) *cwMetrics {
2323
return &cwMetrics{
2424
config: cwmConfig{Namespace: "Benthos", FlushPeriod: 100 * time.Millisecond},
2525
datumses: map[string]*cloudWatchDatum{},
@@ -29,7 +29,7 @@ func cwmMock(svc cloudwatchiface.CloudWatchAPI) *cwMetrics {
2929
}
3030
}
3131

32-
func (m *mockCloudWatchClient) PutMetricData(input *cloudwatch.PutMetricDataInput) (*cloudwatch.PutMetricDataOutput, error) {
32+
func (m *mockCloudWatchClient) PutMetricDataWithContext(ctx aws.Context, input *cloudwatch.PutMetricDataInput, opts ...request.Option) (*cloudwatch.PutMetricDataOutput, error) {
3333
m.inputs = append(m.inputs, *input)
3434
if len(m.errs) > 0 {
3535
err := m.errs[0]

internal/impl/aws/output_dynamodb.go

+11-2
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ import (
1111

1212
"github.com/Jeffail/gabs/v2"
1313
"github.com/aws/aws-sdk-go/aws"
14+
"github.com/aws/aws-sdk-go/aws/request"
1415
"github.com/aws/aws-sdk-go/aws/session"
1516
"github.com/aws/aws-sdk-go/service/dynamodb"
16-
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
1717
"github.com/cenkalti/backoff/v4"
1818
"github.com/google/go-cmp/cmp"
1919

@@ -171,8 +171,17 @@ func init() {
171171
}
172172
}
173173

174+
type dynamoDBAPI interface {
175+
PutItem(input *dynamodb.PutItemInput) (*dynamodb.PutItemOutput, error)
176+
BatchWriteItem(input *dynamodb.BatchWriteItemInput) (*dynamodb.BatchWriteItemOutput, error)
177+
BatchExecuteStatementWithContext(aws.Context, *dynamodb.BatchExecuteStatementInput, ...request.Option) (*dynamodb.BatchExecuteStatementOutput, error)
178+
DescribeTable(*dynamodb.DescribeTableInput) (*dynamodb.DescribeTableOutput, error)
179+
GetItem(*dynamodb.GetItemInput) (*dynamodb.GetItemOutput, error)
180+
DeleteItem(*dynamodb.DeleteItemInput) (*dynamodb.DeleteItemOutput, error)
181+
}
182+
174183
type dynamoDBWriter struct {
175-
client dynamodbiface.DynamoDBAPI
184+
client dynamoDBAPI
176185
conf ddboConfig
177186
log *service.Logger
178187

internal/impl/aws/output_dynamodb_test.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,14 @@ import (
77

88
"github.com/aws/aws-sdk-go/aws"
99
"github.com/aws/aws-sdk-go/service/dynamodb"
10-
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
1110
"github.com/stretchr/testify/assert"
1211
"github.com/stretchr/testify/require"
1312

1413
"github.com/benthosdev/benthos/v4/public/service"
1514
)
1615

1716
type mockDynamoDB struct {
18-
dynamodbiface.DynamoDBAPI
17+
dynamoDBAPI
1918
fn func(*dynamodb.PutItemInput) (*dynamodb.PutItemOutput, error)
2019
batchFn func(*dynamodb.BatchWriteItemInput) (*dynamodb.BatchWriteItemOutput, error)
2120
}

internal/impl/aws/output_kinesis.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"github.com/aws/aws-sdk-go/aws"
99
"github.com/aws/aws-sdk-go/aws/session"
1010
"github.com/aws/aws-sdk-go/service/kinesis"
11-
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
1211
"github.com/cenkalti/backoff/v4"
1312

1413
"github.com/benthosdev/benthos/v4/internal/component"
@@ -111,9 +110,13 @@ const (
111110
mebibyte = 1048576
112111
)
113112

113+
type kinesisAPI interface {
114+
PutRecords(input *kinesis.PutRecordsInput) (*kinesis.PutRecordsOutput, error)
115+
}
116+
114117
type kinesisWriter struct {
115118
conf koConfig
116-
kinesis kinesisiface.KinesisAPI
119+
kinesis kinesisAPI
117120
log *service.Logger
118121
}
119122

internal/impl/aws/output_kinesis_firehose.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"github.com/aws/aws-sdk-go/aws"
99
"github.com/aws/aws-sdk-go/aws/session"
1010
"github.com/aws/aws-sdk-go/service/firehose"
11-
"github.com/aws/aws-sdk-go/service/firehose/firehoseiface"
1211
"github.com/cenkalti/backoff/v4"
1312

1413
"github.com/benthosdev/benthos/v4/internal/component"
@@ -91,8 +90,13 @@ func init() {
9190
}
9291
}
9392

93+
type firehoseAPI interface {
94+
DescribeDeliveryStream(*firehose.DescribeDeliveryStreamInput) (*firehose.DescribeDeliveryStreamOutput, error)
95+
PutRecordBatch(input *firehose.PutRecordBatchInput) (*firehose.PutRecordBatchOutput, error)
96+
}
97+
9498
type kinesisFirehoseWriter struct {
95-
firehose firehoseiface.FirehoseAPI
99+
firehose firehoseAPI
96100

97101
conf kfoConfig
98102
log *service.Logger

internal/impl/aws/output_kinesis_firehose_test.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,14 @@ import (
1010
"github.com/aws/aws-sdk-go/aws/credentials"
1111
"github.com/aws/aws-sdk-go/aws/session"
1212
"github.com/aws/aws-sdk-go/service/firehose"
13-
"github.com/aws/aws-sdk-go/service/firehose/firehoseiface"
1413
"github.com/cenkalti/backoff/v4"
1514
"github.com/stretchr/testify/require"
1615

1716
"github.com/benthosdev/benthos/v4/public/service"
1817
)
1918

2019
type mockKinesisFirehose struct {
21-
firehoseiface.FirehoseAPI
20+
firehoseAPI
2221
fn func(input *firehose.PutRecordBatchInput) (*firehose.PutRecordBatchOutput, error)
2322
}
2423

internal/impl/aws/output_kinesis_test.go

-2
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,13 @@ import (
88

99
"github.com/aws/aws-sdk-go/aws"
1010
"github.com/aws/aws-sdk-go/service/kinesis"
11-
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
1211
"github.com/stretchr/testify/assert"
1312
"github.com/stretchr/testify/require"
1413

1514
"github.com/benthosdev/benthos/v4/public/service"
1615
)
1716

1817
type mockKinesis struct {
19-
kinesisiface.KinesisAPI
2018
fn func(input *kinesis.PutRecordsInput) (*kinesis.PutRecordsOutput, error)
2119
}
2220

internal/impl/aws/output_sqs.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
"github.com/aws/aws-sdk-go/aws"
1414
"github.com/aws/aws-sdk-go/aws/session"
1515
"github.com/aws/aws-sdk-go/service/sqs"
16-
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
1716
"github.com/cenkalti/backoff/v4"
1817

1918
"github.com/benthosdev/benthos/v4/internal/component"
@@ -136,7 +135,7 @@ func init() {
136135

137136
type sqsWriter struct {
138137
conf sqsoConfig
139-
sqs sqsiface.SQSAPI
138+
sqs sqsAPI
140139

141140
closer sync.Once
142141
closeChan chan struct{}
@@ -296,7 +295,7 @@ func (a *sqsWriter) WriteBatch(ctx context.Context, batch service.MessageBatch)
296295
wait := backOff.NextBackOff()
297296

298297
var batchResult *sqs.SendMessageBatchOutput
299-
if batchResult, err = a.sqs.SendMessageBatch(input); err != nil {
298+
if batchResult, err = a.sqs.SendMessageBatchWithContext(ctx, input); err != nil {
300299
a.log.Warnf("SQS error: %v\n", err)
301300
// bail if a message is too large or all retry attempts expired
302301
if wait == backoff.Stop {

internal/impl/aws/output_sqs_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ import (
88

99
"github.com/aws/aws-sdk-go/aws"
1010
"github.com/aws/aws-sdk-go/aws/credentials"
11+
"github.com/aws/aws-sdk-go/aws/request"
1112
"github.com/aws/aws-sdk-go/aws/session"
1213
"github.com/aws/aws-sdk-go/service/sqs"
13-
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
1414
"github.com/cenkalti/backoff/v4"
1515
"github.com/stretchr/testify/assert"
1616
"github.com/stretchr/testify/require"
@@ -111,11 +111,11 @@ func TestSQSHeaderCheck(t *testing.T) {
111111
}
112112

113113
type mockSqs struct {
114-
sqsiface.SQSAPI
114+
sqsAPI
115115
fn func(*sqs.SendMessageBatchInput) (*sqs.SendMessageBatchOutput, error)
116116
}
117117

118-
func (m *mockSqs) SendMessageBatch(input *sqs.SendMessageBatchInput) (*sqs.SendMessageBatchOutput, error) {
118+
func (m *mockSqs) SendMessageBatchWithContext(ctx aws.Context, input *sqs.SendMessageBatchInput, opts ...request.Option) (*sqs.SendMessageBatchOutput, error) {
119119
return m.fn(input)
120120
}
121121

internal/impl/aws/processor_dynamodb_partiql.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"fmt"
77

88
"github.com/aws/aws-sdk-go/service/dynamodb"
9-
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
109
"github.com/mitchellh/mapstructure"
1110

1211
"github.com/benthosdev/benthos/v4/internal/impl/aws/config"
@@ -81,7 +80,7 @@ pipeline:
8180

8281
type dynamoDBPartiQL struct {
8382
logger *service.Logger
84-
client dynamodbiface.DynamoDBAPI
83+
client dynamoDBAPI
8584

8685
query string
8786
dynQuery *service.InterpolatedString
@@ -90,7 +89,7 @@ type dynamoDBPartiQL struct {
9089

9190
func newDynamoDBPartiQL(
9291
logger *service.Logger,
93-
client dynamodbiface.DynamoDBAPI,
92+
client dynamoDBAPI,
9493
query string,
9594
dynQuery *service.InterpolatedString,
9695
args *bloblang.Executor,

0 commit comments

Comments
 (0)