Skip to content

Commit 71a5e1e

Browse files
authored
feat: Kafka SASL OAUTH token refreshing (#2834)
1 parent 319dce7 commit 71a5e1e

File tree

8 files changed

+347
-65
lines changed

8 files changed

+347
-65
lines changed

etc/kapacitor/kapacitor.conf

+35-16
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ default-retention-policy = ""
3434
# Password for basic user authorization when using meta API. meta-username must also be set.
3535
# meta-password = "kapapass"
3636

37-
# Shared secret for JWT bearer token authentication when using meta API.
37+
# Shared secret for JWT bearer token authentication when using meta API.
3838
# If this is set, then the `meta-username` and `meta-password` settings are ignored.
3939
# This should match the `[meta] internal-shared-secret` setting on the meta nodes.
4040
# meta-internal-shared-secret = "MyVoiceIsMyPassport"
@@ -573,27 +573,46 @@ default-retention-policy = ""
573573
# Use SSL but skip chain & host verification
574574
insecure-skip-verify = false
575575
## Optional SASL Config
576-
# sasl_username = "kafka"
577-
# sasl_password = "secret"
576+
# sasl-username = "kafka"
577+
# sasl-password = "secret"
578+
## Arbitrary key value string pairs to pass as a TOML table. For example:
579+
# {logicalCluster = "cluster-042", poolId = "pool-027"}
580+
# sasl-extensions = {}
578581
## Optional SASL:
579582
## one of: OAUTHBEARER, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI
580583
## (defaults to PLAIN)
581-
# sasl_mechanism = ""
584+
# sasl-mechanism = ""
582585
## used if sasl_mechanism is GSSAPI
583-
# sasl_gssapi_service_name = ""
586+
# sasl-gssapi-service-name = ""
584587
# ## One of: KRB5_USER_AUTH and KRB5_KEYTAB_AUTH
585-
# sasl_gssapi_auth_type = "KRB5_USER_AUTH"
586-
# sasl_gssapi_kerberos_config_path = "/"
587-
# sasl_gssapi_realm = "realm"
588-
# sasl_gssapi_key_tab_path = ""
589-
# sasl_gssapi_disable_pafxfast = false
590-
## Access token used if sasl_mechanism is OAUTHBEARER
591-
# sasl_access_token = ""
592-
## Arbitrary key value string pairs to pass as a TOML table. For example:
593-
# {logicalCluster = "cluster-042", poolId = "pool-027"}
594-
# sasl_extensions = {}
588+
# sasl-gssapi-auth-type = "KRB5_USER_AUTH"
589+
# sasl-gssapi-kerberos-config-path = "/"
590+
# sasl-gssapi-realm = "realm"
591+
# sasl-gssapi-key-tab-path = ""
592+
# sasl-gssapi-disable-pafxfast = false
593+
## Options if sasl-mechanism is OAUTHBEARER
594+
## The service name to use when authenticating with SASL/OAUTH.
595+
# ## One of: "" or custom, auth0, azuread
596+
# sasl-oauth-service = ""
597+
## The client ID to use when authenticating with SASL/OAUTH.
598+
# sasl-oauth-client-id = ""
599+
## The client secret to use when authenticating with SASL/OAUTH.
600+
# sasl-oauth-client-secret = ""
601+
## The token URL to use when sasl-oauth-service is custom or auth0. Leave empty otherwise.
602+
# sasl-oauth-token-url = ""
603+
## The margin for the token's expiration time.
604+
# sasl-oauth-token-expiry-margin = "10s"
605+
## Optional scopes to use when authenticating with SASL/OAUTH.
606+
# sasl-oauth-scopes = ""
607+
## Tenant ID for the AzureAD service.
608+
# sasl-oauth-tenant-id = ""
609+
## The optional params for SASL/OAUTH. e.g. audience for AUTH0
610+
[kafka.sasl-oauth-parameters]
611+
# audience = ""
612+
## Static OAUTH token. Use this instead of other OAUTH params.
613+
# sasl-access-token = ""
595614
## SASL protocol version. When connecting to Azure EventHub set to 0.
596-
# sasl_version = 1
615+
# sasl-version = 1
597616

598617
[alerta]
599618
# Configure Alerta.

go.mod

+2-2
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ require (
6565
cloud.google.com/go/bigquery v1.50.0 // indirect
6666
cloud.google.com/go/bigtable v1.10.1 // indirect
6767
cloud.google.com/go/compute v1.19.1 // indirect
68-
cloud.google.com/go/compute/metadata v0.2.3 // indirect
68+
cloud.google.com/go/compute/metadata v0.3.0 // indirect
6969
cloud.google.com/go/iam v0.13.0 // indirect
7070
cloud.google.com/go/longrunning v0.4.1 // indirect
7171
collectd.org v0.3.0 // indirect
@@ -242,7 +242,7 @@ require (
242242
golang.org/x/exp/typeparams v0.0.0-20221208152030-732eee02a75a // indirect
243243
golang.org/x/mod v0.17.0 // indirect
244244
golang.org/x/net v0.28.0 // indirect
245-
golang.org/x/oauth2 v0.7.0 // indirect
245+
golang.org/x/oauth2 v0.23.0 // indirect
246246
golang.org/x/sync v0.8.0 // indirect
247247
golang.org/x/sys v0.23.0 // indirect
248248
golang.org/x/term v0.23.0 // indirect

go.sum

+4
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ cloud.google.com/go/compute v1.19.1 h1:am86mquDUgjGNWxiGn+5PGLbmgiWXlE/yNWpIpNvu
3939
cloud.google.com/go/compute v1.19.1/go.mod h1:6ylj3a05WF8leseCdIf77NK0g1ey+nj5IKd5/kvShxE=
4040
cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY=
4141
cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA=
42+
cloud.google.com/go/compute/metadata v0.3.0 h1:Tz+eQXMEqDIKRsmY3cHTL6FVaynIjX2QxYC4trgAKZc=
43+
cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k=
4244
cloud.google.com/go/datacatalog v1.13.0 h1:4H5IJiyUE0X6ShQBqgFFZvGGcrwGVndTwUSLP4c52gw=
4345
cloud.google.com/go/datacatalog v1.13.0/go.mod h1:E4Rj9a5ZtAxcQJlEBTLgMTphfP11/lNaAshpoBgemX8=
4446
cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE=
@@ -1623,6 +1625,8 @@ golang.org/x/oauth2 v0.0.0-20210427180440-81ed05c6b58c/go.mod h1:KelEdhl1UZF7XfJ
16231625
golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
16241626
golang.org/x/oauth2 v0.7.0 h1:qe6s0zUXlPX80/dITx3440hWZ7GwMwgDDyrSGTPJG/g=
16251627
golang.org/x/oauth2 v0.7.0/go.mod h1:hPLQkd9LyjfXTiRohC/41GhcFqxisoUQ99sCUOHO9x4=
1628+
golang.org/x/oauth2 v0.23.0 h1:PbgcYx2W7i4LvjJWEbf0ngHV6qJYr86PkAV3bXdLEbs=
1629+
golang.org/x/oauth2 v0.23.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
16261630
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
16271631
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
16281632
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=

services/kafka/config.go

+23-11
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,11 @@ import (
1212
)
1313

1414
const (
15-
DefaultTimeout = 10 * time.Second
16-
DefaultBatchSize = 100
17-
DefaultBatchTimeout = 1 * time.Second
18-
DefaultID = "default"
15+
DefaultTimeout = 10 * time.Second
16+
DefaultBatchSize = 100
17+
DefaultBatchTimeout = 1 * time.Second
18+
DefaultID = "default"
19+
DefaultSASLOAUTHExpiryMargin = 10 * time.Second
1920
)
2021

2122
type Config struct {
@@ -49,7 +50,7 @@ type Config struct {
4950
}
5051

5152
func NewConfig() Config {
52-
return Config{ID: DefaultID}
53+
return Config{ID: DefaultID, SASLAuth: SASLAuth{SASLOAUTHExpiryMargin: DefaultSASLOAUTHExpiryMargin}}
5354
}
5455

5556
func (c Config) Validate() error {
@@ -63,7 +64,7 @@ func (c Config) Validate() error {
6364
if len(c.Brokers) == 0 {
6465
return errors.New("no brokers specified, must provide at least one broker URL")
6566
}
66-
return nil
67+
return c.SASLAuth.Validate()
6768
}
6869

6970
func (c *Config) ApplyConditionalDefaults() {
@@ -78,17 +79,27 @@ func (c *Config) ApplyConditionalDefaults() {
7879
}
7980
}
8081

82+
type Closer interface {
83+
Close()
84+
}
85+
86+
type WriterConfig struct {
87+
// additional resource to close
88+
Closer Closer
89+
Config *kafka.Config
90+
}
91+
8192
type WriteTarget struct {
8293
Topic string
8394
PartitionById bool
8495
PartitionAlgorithm string
8596
}
8697

87-
func (c Config) writerConfig(diagnostic Diagnostic, target WriteTarget) (*kafka.Config, error) {
98+
func (c Config) writerConfig(target WriteTarget) (*WriterConfig, error) {
8899
cfg := kafka.NewConfig()
89100

90101
if target.Topic == "" {
91-
return cfg, errors.New("topic must not be empty")
102+
return &WriterConfig{nil, cfg}, errors.New("topic must not be empty")
92103
}
93104
var partitioner kafka.PartitionerConstructor
94105
if target.PartitionById {
@@ -104,7 +115,7 @@ func (c Config) writerConfig(diagnostic Diagnostic, target WriteTarget) (*kafka.
104115
case "fnv-1a":
105116
partitioner = kafka.NewHashPartitioner
106117
default:
107-
return cfg, fmt.Errorf("invalid partition algorithm: %q", target.PartitionAlgorithm)
118+
return &WriterConfig{nil, cfg}, fmt.Errorf("invalid partition algorithm: %q", target.PartitionAlgorithm)
108119
}
109120
cfg.Producer.Partitioner = partitioner
110121
}
@@ -135,10 +146,11 @@ func (c Config) writerConfig(diagnostic Diagnostic, target WriteTarget) (*kafka.
135146
cfg.Producer.Flush.Frequency = time.Duration(c.BatchTimeout)
136147

137148
// SASL
138-
if err := c.SASLAuth.SetSASLConfig(cfg); err != nil {
149+
if o, err := c.SASLAuth.SetSASLConfig(cfg); err != nil {
139150
return nil, err
151+
} else {
152+
return &WriterConfig{o, cfg}, cfg.Validate()
140153
}
141-
return cfg, cfg.Validate()
142154
}
143155

144156
type Configs []Config

0 commit comments

Comments
 (0)