Skip to content

Commit 23f5308

Browse files
authored
Add cli flag for secrets backends (#2964)
* Pin main of benthos * Add secrets management * Update to enterprise headers * Ensure disable env lookup applies without custom secrets * Elif the env disable * Remove disable-env-lookup flag
1 parent 5ee8fb6 commit 23f5308

File tree

5 files changed

+233
-1
lines changed

5 files changed

+233
-1
lines changed

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,7 @@ require (
360360
github.com/tilinna/z85 v1.0.0 // indirect
361361
github.com/tklauser/go-sysconf v0.3.13 // indirect
362362
github.com/tklauser/numcpus v0.7.0 // indirect
363-
github.com/urfave/cli/v2 v2.27.4 // indirect
363+
github.com/urfave/cli/v2 v2.27.4
364364
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
365365
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
366366
github.com/xdg-go/stringprep v1.0.4 // indirect

internal/cli/enterprise.go

+26
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@ import (
1616

1717
"github.com/redpanda-data/benthos/v4/public/service"
1818
"github.com/rs/xid"
19+
"github.com/urfave/cli/v2"
1920

2021
"github.com/redpanda-data/connect/v4/internal/impl/kafka/enterprise"
22+
"github.com/redpanda-data/connect/v4/internal/secrets"
2123
"github.com/redpanda-data/connect/v4/internal/telemetry"
2224
)
2325

@@ -39,6 +41,10 @@ func InitEnterpriseCLI(binaryName, version, dateBuilt string, schema *service.Co
3941
os.Exit(1)
4042
}
4143

44+
secretLookupFn := func(ctx context.Context, key string) (string, bool) {
45+
return "", false
46+
}
47+
4248
opts = append(opts,
4349
service.CLIOptSetVersion(version, dateBuilt),
4450
service.CLIOptSetBinaryName(binaryName),
@@ -82,6 +88,26 @@ func InitEnterpriseCLI(binaryName, version, dateBuilt string, schema *service.Co
8288
rpLogger.SetStreamSummary(s)
8389
return nil
8490
}),
91+
92+
// Secrets management
93+
service.CLIOptCustomRunFlags([]cli.Flag{
94+
&cli.StringSliceFlag{
95+
Name: "secrets",
96+
Usage: "Attempt to load secrets from a provided URN. If more than one entry is specified they will be attempted in order until a value is found. Environment variable lookups are specified with the URN `env:`, which by default is the only entry. In order to disable all secret lookups specify a single entry of `none:`.",
97+
Value: cli.NewStringSlice("env:"),
98+
},
99+
}, func(c *cli.Context) error {
100+
secretsURNs := c.StringSlice("secrets")
101+
if len(secretsURNs) > 0 {
102+
var err error
103+
secretLookupFn, err = secrets.ParseLookupURNs(c.Context, slog.New(rpLogger), secretsURNs...)
104+
return err
105+
}
106+
return nil
107+
}),
108+
service.CLIOptSetEnvVarLookup(func(ctx context.Context, key string) (string, bool) {
109+
return secretLookupFn(ctx, key)
110+
}),
85111
)
86112

87113
exitCode, err := service.RunCLIToCode(context.Background(), opts...)

internal/secrets/redis.go

+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// Copyright 2024 Redpanda Data, Inc.
2+
//
3+
// Licensed as a Redpanda Enterprise file under the Redpanda Community
4+
// License (the "License"); you may not use this file except in compliance with
5+
// the License. You may obtain a copy of the License at
6+
//
7+
// https://github.com/redpanda-data/connect/blob/main/licenses/rcl.md
8+
9+
package secrets
10+
11+
import (
12+
"context"
13+
"errors"
14+
"log/slog"
15+
"net/url"
16+
17+
"github.com/redis/go-redis/v9"
18+
)
19+
20+
type redisSecretsClient struct {
21+
logger *slog.Logger
22+
client *redis.Client
23+
}
24+
25+
func (r *redisSecretsClient) lookup(ctx context.Context, key string) (string, bool) {
26+
res, err := r.client.Get(ctx, key).Result()
27+
if err != nil {
28+
if !errors.Is(err, redis.Nil) {
29+
// An error that isn't due to key-not-found gets logged
30+
r.logger.With("error", err, "key", key).Error("Failed to look up secret")
31+
}
32+
return "", false
33+
}
34+
return res, true
35+
}
36+
37+
func newRedisSecretsLookup(ctx context.Context, logger *slog.Logger, url *url.URL) (LookupFn, error) {
38+
opts, err := redis.ParseURL(url.String())
39+
if err != nil {
40+
return nil, err
41+
}
42+
43+
r := &redisSecretsClient{
44+
logger: logger,
45+
client: redis.NewClient(opts),
46+
}
47+
return r.lookup, nil
48+
}

internal/secrets/redis_test.go

+75
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
// Copyright 2024 Redpanda Data, Inc.
2+
//
3+
// Licensed as a Redpanda Enterprise file under the Redpanda Community
4+
// License (the "License"); you may not use this file except in compliance with
5+
// the License. You may obtain a copy of the License at
6+
//
7+
// https://github.com/redpanda-data/connect/blob/main/licenses/rcl.md
8+
9+
package secrets
10+
11+
import (
12+
"context"
13+
"fmt"
14+
"log/slog"
15+
"net/url"
16+
"testing"
17+
"time"
18+
19+
"github.com/ory/dockertest/v3"
20+
"github.com/redis/go-redis/v9"
21+
"github.com/stretchr/testify/assert"
22+
"github.com/stretchr/testify/require"
23+
24+
_ "github.com/redpanda-data/benthos/v4/public/components/pure"
25+
"github.com/redpanda-data/benthos/v4/public/service/integration"
26+
)
27+
28+
func TestIntegrationRedis(t *testing.T) {
29+
integration.CheckSkip(t)
30+
t.Parallel()
31+
32+
pool, err := dockertest.NewPool("")
33+
require.NoError(t, err)
34+
35+
pool.MaxWait = time.Second * 30
36+
resource, err := pool.Run("redis", "latest", nil)
37+
require.NoError(t, err)
38+
t.Cleanup(func() {
39+
assert.NoError(t, pool.Purge(resource))
40+
})
41+
42+
urlStr := fmt.Sprintf("redis://localhost:%v", resource.GetPort("6379/tcp"))
43+
uri, err := url.Parse(urlStr)
44+
if err != nil {
45+
t.Fatal(err)
46+
}
47+
48+
opts, err := redis.ParseURL(uri.String())
49+
if err != nil {
50+
t.Fatal(err)
51+
}
52+
53+
client := redis.NewClient(opts)
54+
55+
_ = resource.Expire(900)
56+
require.NoError(t, pool.Retry(func() error {
57+
return client.Ping(context.Background()).Err()
58+
}))
59+
60+
ctx, done := context.WithTimeout(context.Background(), time.Minute)
61+
defer done()
62+
63+
require.NoError(t, client.Set(ctx, "bar", "meow", time.Minute).Err())
64+
65+
secretsLookup, err := parseSecretsLookupURN(ctx, slog.Default(), urlStr)
66+
require.NoError(t, err)
67+
68+
v, exists := secretsLookup(ctx, "foo")
69+
assert.False(t, exists)
70+
assert.Equal(t, "", v)
71+
72+
v, exists = secretsLookup(ctx, "bar")
73+
assert.True(t, exists)
74+
assert.Equal(t, "meow", v)
75+
}

internal/secrets/secrets.go

+83
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
// Copyright 2024 Redpanda Data, Inc.
2+
//
3+
// Licensed as a Redpanda Enterprise file under the Redpanda Community
4+
// License (the "License"); you may not use this file except in compliance with
5+
// the License. You may obtain a copy of the License at
6+
//
7+
// https://github.com/redpanda-data/connect/blob/main/licenses/rcl.md
8+
9+
package secrets
10+
11+
import (
12+
"context"
13+
"fmt"
14+
"log/slog"
15+
"net/url"
16+
"os"
17+
)
18+
19+
// LookupFn defines the common closure that a secrets management client provides
20+
// and is then fed into a Redpanda Connect cli constructor.
21+
type LookupFn func(context.Context, string) (string, bool)
22+
23+
type lookupTiers []LookupFn
24+
25+
func (l lookupTiers) Lookup(ctx context.Context, key string) (string, bool) {
26+
for _, fn := range l {
27+
if v, ok := fn(ctx, key); ok {
28+
return v, ok
29+
}
30+
if ctx.Err() != nil {
31+
break
32+
}
33+
}
34+
return "", false
35+
}
36+
37+
// ParseLookupURNs attempts to parse a series of secrets lookup solutions
38+
// defined as URNs and returns a single lookup func for obtaining secrets from
39+
// them in the order provided.
40+
//
41+
// A toggle can be provided that determines whether environment variables should
42+
// be considered the last look up option, in which case if all others fail to
43+
// provide a secret then an environment variable under the key is returned if
44+
// found.
45+
func ParseLookupURNs(ctx context.Context, logger *slog.Logger, secretsMgmtUrns ...string) (LookupFn, error) {
46+
var tiers lookupTiers
47+
48+
for _, urn := range secretsMgmtUrns {
49+
tier, err := parseSecretsLookupURN(ctx, logger, urn)
50+
if err != nil {
51+
return nil, err
52+
}
53+
tiers = append(tiers, tier)
54+
}
55+
56+
return tiers.Lookup, nil
57+
}
58+
59+
func parseSecretsLookupURN(ctx context.Context, logger *slog.Logger, urn string) (LookupFn, error) {
60+
u, err := url.Parse(urn)
61+
if err != nil {
62+
return nil, err
63+
}
64+
65+
switch u.Scheme {
66+
case "test":
67+
return func(ctx context.Context, key string) (string, bool) {
68+
return key + " " + u.Host, true
69+
}, nil
70+
case "redis":
71+
return newRedisSecretsLookup(ctx, logger, u)
72+
case "env":
73+
return func(ctx context.Context, key string) (string, bool) {
74+
return os.LookupEnv(key)
75+
}, nil
76+
case "none":
77+
return func(ctx context.Context, key string) (string, bool) {
78+
return "", false
79+
}, nil
80+
default:
81+
return nil, fmt.Errorf("secrets scheme %v not recognized", u.Scheme)
82+
}
83+
}

0 commit comments

Comments
 (0)