Skip to content

Commit 4edd88c

Browse files
authored
Merge pull request #2945 from redpanda-data/woof
aws/sqs: support interpolating output queue URL
2 parents dc6bbb3 + cf9b2d9 commit 4edd88c

File tree

16 files changed

+154
-14
lines changed

16 files changed

+154
-14
lines changed

CHANGELOG.md

+11
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,17 @@ Changelog
33

44
All notable changes to this project will be documented in this file.
55

6+
## 4.39.0 - TBD
7+
8+
### Added
9+
10+
- New `timeplus` input. (@ye11ow)
11+
- New `snowflake_streaming` output. (@rockwotj)
12+
13+
### Changed
14+
15+
- The `aws_sqs` output field `url` now supports interpolation functions. (@rockwotj)
16+
617
## 4.38.0 - 2024-10-17
718

819
### Added

docs/modules/components/pages/outputs/aws_sqs.adoc

+1
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ This output benefits from sending messages as a batch for improved performance.
115115
=== `url`
116116
117117
The URL of the target SQS queue.
118+
This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions].
118119
119120
120121
*Type*: `string`

internal/impl/aws/output_sqs.go

+29-6
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ const (
5050
)
5151

5252
type sqsoConfig struct {
53-
URL string
53+
URL *service.InterpolatedString
5454
MessageGroupID *service.InterpolatedString
5555
MessageDeduplicationID *service.InterpolatedString
5656
DelaySeconds *service.InterpolatedString
@@ -61,7 +61,7 @@ type sqsoConfig struct {
6161
}
6262

6363
func sqsoConfigFromParsed(pConf *service.ParsedConfig) (conf sqsoConfig, err error) {
64-
if conf.URL, err = pConf.FieldString(sqsoFieldURL); err != nil {
64+
if conf.URL, err = pConf.FieldInterpolatedString(sqsoFieldURL); err != nil {
6565
return
6666
}
6767
if pConf.Contains(sqsoFieldMessageGroupID) {
@@ -106,7 +106,7 @@ The fields `+"`message_group_id`, `message_deduplication_id` and `delay_seconds`
106106
107107
By default Redpanda Connect will use a shared credentials file when connecting to AWS services. It's also possible to set them explicitly at the component level, allowing you to transfer data across accounts. You can find out more in xref:guides:cloud/aws.adoc[].`+service.OutputPerformanceDocs(true, true)).
108108
Fields(
109-
service.NewStringField(sqsoFieldURL).Description("The URL of the target SQS queue."),
109+
service.NewInterpolatedStringField(sqsoFieldURL).Description("The URL of the target SQS queue."),
110110
service.NewInterpolatedStringField(sqsoFieldMessageGroupID).
111111
Description("An optional group ID to set for messages.").
112112
Optional(),
@@ -270,9 +270,11 @@ func (a *sqsWriter) WriteBatch(ctx context.Context, batch service.MessageBatch)
270270

271271
backOff := a.conf.backoffCtor()
272272

273-
entries := []types.SendMessageBatchRequestEntry{}
273+
entries := map[string][]types.SendMessageBatchRequestEntry{}
274274
attrMap := map[string]sqsAttributes{}
275275

276+
urlExecutor := batch.InterpolationExecutor(a.conf.URL)
277+
276278
for i := 0; i < len(batch); i++ {
277279
id := strconv.Itoa(i)
278280
attrs, err := a.getSQSAttributes(batch, i)
@@ -282,7 +284,11 @@ func (a *sqsWriter) WriteBatch(ctx context.Context, batch service.MessageBatch)
282284

283285
attrMap[id] = attrs
284286

285-
entries = append(entries, types.SendMessageBatchRequestEntry{
287+
url, err := urlExecutor.TryString(i)
288+
if err != nil {
289+
return fmt.Errorf("error interpolating %s: %w", sqsoFieldURL, err)
290+
}
291+
entries[url] = append(entries[url], types.SendMessageBatchRequestEntry{
286292
Id: &id,
287293
MessageBody: attrs.content,
288294
MessageAttributes: attrs.attrMap,
@@ -292,8 +298,25 @@ func (a *sqsWriter) WriteBatch(ctx context.Context, batch service.MessageBatch)
292298
})
293299
}
294300

301+
for url, entries := range entries {
302+
backOff.Reset()
303+
if err := a.writeChunk(ctx, url, entries, attrMap, backOff); err != nil {
304+
return err
305+
}
306+
}
307+
308+
return nil
309+
}
310+
311+
func (a *sqsWriter) writeChunk(
312+
ctx context.Context,
313+
url string,
314+
entries []types.SendMessageBatchRequestEntry,
315+
attrMap map[string]sqsAttributes,
316+
backOff backoff.BackOff,
317+
) error {
295318
input := &sqs.SendMessageBatchInput{
296-
QueueUrl: aws.String(a.conf.URL),
319+
QueueUrl: &url,
297320
Entries: entries,
298321
}
299322

internal/impl/aws/output_sqs_test.go

+97-3
Original file line numberDiff line numberDiff line change
@@ -146,9 +146,10 @@ func TestSQSRetries(t *testing.T) {
146146
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider("xxxxx", "xxxxx", "xxxxx")),
147147
)
148148
require.NoError(t, err)
149-
149+
url, err := service.NewInterpolatedString("http://foo.example.com")
150+
require.NoError(t, err)
150151
w, err := newSQSWriter(sqsoConfig{
151-
URL: "http://foo.example.com",
152+
URL: url,
152153
backoffCtor: func() backoff.BackOff {
153154
return backoff.NewExponentialBackOff()
154155
},
@@ -218,8 +219,10 @@ func TestSQSSendLimit(t *testing.T) {
218219
)
219220
require.NoError(t, err)
220221

222+
url, err := service.NewInterpolatedString("http://foo.example.com")
223+
require.NoError(t, err)
221224
w, err := newSQSWriter(sqsoConfig{
222-
URL: "http://foo.example.com",
225+
URL: url,
223226
backoffCtor: func() backoff.BackOff {
224227
return backoff.NewExponentialBackOff()
225228
},
@@ -281,3 +284,94 @@ func TestSQSSendLimit(t *testing.T) {
281284
},
282285
}, in)
283286
}
287+
288+
func TestSQSMultipleQueues(t *testing.T) {
289+
tCtx := context.Background()
290+
291+
conf, err := config.LoadDefaultConfig(context.Background(),
292+
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider("xxxxx", "xxxxx", "xxxxx")),
293+
)
294+
require.NoError(t, err)
295+
296+
url, err := service.NewInterpolatedString("http://${!counter()%2}.example.com")
297+
require.NoError(t, err)
298+
w, err := newSQSWriter(sqsoConfig{
299+
URL: url,
300+
backoffCtor: func() backoff.BackOff {
301+
return backoff.NewExponentialBackOff()
302+
},
303+
aconf: conf,
304+
}, service.MockResources())
305+
require.NoError(t, err)
306+
307+
in := map[string][]inEntries{}
308+
sendCalls := 0
309+
w.sqs = &mockSqs{
310+
fn: func(smbi *sqs.SendMessageBatchInput) (*sqs.SendMessageBatchOutput, error) {
311+
var e inEntries
312+
for _, entry := range smbi.Entries {
313+
e = append(e, inMsg{
314+
id: *entry.Id,
315+
content: *entry.MessageBody,
316+
})
317+
}
318+
if smbi.QueueUrl == nil {
319+
return nil, errors.New("nil queue URL")
320+
}
321+
in[*smbi.QueueUrl] = append(in[*smbi.QueueUrl], e)
322+
sendCalls++
323+
return &sqs.SendMessageBatchOutput{}, nil
324+
},
325+
}
326+
327+
inMsg := service.MessageBatch{}
328+
for i := 0; i < 30; i++ {
329+
inMsg = append(inMsg, service.NewMessage([]byte(fmt.Sprintf("hello world %v", i+1))))
330+
}
331+
require.NoError(t, w.WriteBatch(tCtx, inMsg))
332+
333+
assert.Equal(t, map[string][]inEntries{
334+
"http://0.example.com": {
335+
{
336+
{id: "1", content: "hello world 2"},
337+
{id: "3", content: "hello world 4"},
338+
{id: "5", content: "hello world 6"},
339+
{id: "7", content: "hello world 8"},
340+
{id: "9", content: "hello world 10"},
341+
{id: "11", content: "hello world 12"},
342+
{id: "13", content: "hello world 14"},
343+
{id: "15", content: "hello world 16"},
344+
{id: "17", content: "hello world 18"},
345+
{id: "19", content: "hello world 20"},
346+
},
347+
{
348+
{id: "21", content: "hello world 22"},
349+
{id: "23", content: "hello world 24"},
350+
{id: "25", content: "hello world 26"},
351+
{id: "27", content: "hello world 28"},
352+
{id: "29", content: "hello world 30"},
353+
},
354+
},
355+
"http://1.example.com": {
356+
{
357+
{id: "0", content: "hello world 1"},
358+
{id: "2", content: "hello world 3"},
359+
{id: "4", content: "hello world 5"},
360+
{id: "6", content: "hello world 7"},
361+
{id: "8", content: "hello world 9"},
362+
{id: "10", content: "hello world 11"},
363+
{id: "12", content: "hello world 13"},
364+
{id: "14", content: "hello world 15"},
365+
{id: "16", content: "hello world 17"},
366+
{id: "18", content: "hello world 19"},
367+
},
368+
{
369+
{id: "20", content: "hello world 21"},
370+
{id: "22", content: "hello world 23"},
371+
{id: "24", content: "hello world 25"},
372+
{id: "26", content: "hello world 27"},
373+
{id: "28", content: "hello world 29"},
374+
},
375+
},
376+
}, in)
377+
}

internal/impl/snowflake/output_snowflake_streaming.go

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717

1818
"github.com/redpanda-data/benthos/v4/public/bloblang"
1919
"github.com/redpanda-data/benthos/v4/public/service"
20+
2021
"github.com/redpanda-data/connect/v4/internal/impl/snowflake/streaming"
2122
)
2223

internal/impl/snowflake/streaming/compat_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@ import (
1818
"testing"
1919
"time"
2020

21-
"github.com/redpanda-data/connect/v4/internal/impl/snowflake/streaming/int128"
2221
"github.com/stretchr/testify/require"
22+
23+
"github.com/redpanda-data/connect/v4/internal/impl/snowflake/streaming/int128"
2324
)
2425

2526
func TestEncryption(t *testing.T) {

internal/impl/snowflake/streaming/integration_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,10 @@ import (
2424
"time"
2525

2626
"github.com/redpanda-data/benthos/v4/public/service"
27-
"github.com/redpanda-data/connect/v4/internal/impl/snowflake/streaming"
2827
"github.com/stretchr/testify/assert"
2928
"github.com/stretchr/testify/require"
29+
30+
"github.com/redpanda-data/connect/v4/internal/impl/snowflake/streaming"
3031
)
3132

3233
func msg(s string) *service.Message {

internal/impl/snowflake/streaming/rest.go

+1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/golang-jwt/jwt"
3131
"github.com/google/uuid"
3232
"github.com/redpanda-data/benthos/v4/public/service"
33+
3334
"github.com/redpanda-data/connect/v4/internal/impl/snowflake/streaming/int128"
3435
"github.com/redpanda-data/connect/v4/internal/periodic"
3536
"github.com/redpanda-data/connect/v4/internal/typed"

internal/impl/snowflake/streaming/schema.go

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818

1919
"github.com/dustin/go-humanize"
2020
"github.com/parquet-go/parquet-go"
21+
2122
"github.com/redpanda-data/connect/v4/internal/impl/snowflake/streaming/int128"
2223
)
2324

internal/impl/snowflake/streaming/streaming.go

+1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/cenkalti/backoff/v4"
2828
"github.com/parquet-go/parquet-go"
2929
"github.com/redpanda-data/benthos/v4/public/service"
30+
3031
"github.com/redpanda-data/connect/v4/internal/periodic"
3132
"github.com/redpanda-data/connect/v4/internal/typed"
3233
)

internal/impl/snowflake/streaming/userdata_converter.go

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/Jeffail/gabs/v2"
2222
"github.com/parquet-go/parquet-go"
2323
"github.com/redpanda-data/benthos/v4/public/bloblang"
24+
2425
"github.com/redpanda-data/connect/v4/internal/impl/snowflake/streaming/int128"
2526
)
2627

internal/impl/snowflake/streaming/userdata_converter_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@ import (
1717
"time"
1818

1919
"github.com/parquet-go/parquet-go"
20-
"github.com/redpanda-data/connect/v4/internal/impl/snowflake/streaming/int128"
2120
"github.com/stretchr/testify/require"
21+
22+
"github.com/redpanda-data/connect/v4/internal/impl/snowflake/streaming/int128"
2223
)
2324

2425
type validateTestCase struct {

internal/impl/timeplus/input.go

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"syscall"
1010

1111
"github.com/redpanda-data/benthos/v4/public/service"
12+
1213
"github.com/redpanda-data/connect/v4/internal/impl/timeplus/driver"
1314
"github.com/redpanda-data/connect/v4/internal/impl/timeplus/http"
1415
)

internal/impl/timeplus/output.go

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"sort"
88

99
"github.com/redpanda-data/benthos/v4/public/service"
10+
1011
"github.com/redpanda-data/connect/v4/internal/impl/timeplus/http"
1112
)
1213

internal/plugins/info.csv

+2-2
Original file line numberDiff line numberDiff line change
@@ -258,8 +258,8 @@ sync_response ,output ,sync_response ,0.0.0 ,certif
258258
sync_response ,processor ,sync_response ,0.0.0 ,certified ,n ,y ,y
259259
system_window ,buffer ,system_window ,3.53.0 ,certified ,n ,y ,y
260260
tar ,scanner ,tar ,0.0.0 ,certified ,n ,y ,y
261-
timeplus ,input ,timeplus ,0.0.0 ,community ,n ,n ,n
262-
timeplus ,output ,timeplus ,4.38.0 ,community ,n ,n ,n
261+
timeplus ,input ,timeplus ,4.39.0 ,community ,n ,y ,y
262+
timeplus ,output ,timeplus ,4.38.0 ,community ,n ,y ,y
263263
to_the_end ,scanner ,to_the_end ,0.0.0 ,certified ,n ,y ,y
264264
try ,processor ,try ,0.0.0 ,certified ,n ,y ,y
265265
ttlru ,cache ,ttlru ,0.0.0 ,community ,n ,y ,y

public/components/cloud/package.go

+1
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
_ "github.com/redpanda-data/connect/v4/public/components/snowflake"
4747
_ "github.com/redpanda-data/connect/v4/public/components/splunk"
4848
_ "github.com/redpanda-data/connect/v4/public/components/sql/base"
49+
_ "github.com/redpanda-data/connect/v4/public/components/timeplus"
4950

5051
// Import all (supported) sql drivers.
5152
_ "github.com/go-sql-driver/mysql"

0 commit comments

Comments
 (0)