Skip to content

Commit

Permalink
Merge pull request #3126 from redpanda-data/instance_id
Browse files Browse the repository at this point in the history
kafka: add instance ID to all kafka inputs
  • Loading branch information
rockwotj authored Jan 17, 2025
2 parents 28d9431 + 7141611 commit da85d74
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ All notable changes to this project will be documented in this file.
### Added

- New `mysql_cdc` input supporting change data capture (CDC) from MySQL. (@rockwotj, @le-vlad)
- Field `instance_id` added to `kafka`, `kafka_franz`, `ockam_kafka`, `redpanda`, `redpanda_common`, and `redpanda_migrator` inputs. (@rockwotj)

## 4.45.1 - 2025-01-17

Expand Down
9 changes: 9 additions & 0 deletions docs/modules/components/pages/inputs/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ input:
token_key: ""
consumer_group: ""
client_id: benthos
instance_id: "" # No default (optional)
rack_id: ""
start_from_oldest: true
checkpoint_limit: 1024
Expand Down Expand Up @@ -480,6 +481,14 @@ An identifier for the client connection.
*Default*: `"benthos"`
=== `instance_id`
When using consumer groups, an identifier for this specific input so that it can be identified over restarts of this process. This should be unique per input.
*Type*: `string`
=== `rack_id`
A rack identifier for this client.
Expand Down
10 changes: 10 additions & 0 deletions docs/modules/components/pages/inputs/kafka_franz.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ input:
topics: [] # No default (required)
regexp_topics: false
rack_id: ""
instance_id: ""
start_from_oldest: true
fetch_max_bytes: 50MiB
fetch_max_wait: 5s
Expand Down Expand Up @@ -550,6 +551,15 @@ Whether listed topics should be interpreted as regular expression patterns for m
A rack specifies where the client is physically located and changes fetch requests to consume from the closest replica as opposed to the leader replica.
*Type*: `string`
*Default*: `""`
=== `instance_id`
When using a consumer group, an instance ID specifies the groups static membership, which can prevent rebalances during reconnects. When using a instance ID the client does NOT leave the group when closing. To actually leave the group one must use an external admin command to leave the group on behalf of this instance ID. This ID must be unique per consumer within the group.
*Type*: `string`
*Default*: `""`
Expand Down
10 changes: 10 additions & 0 deletions docs/modules/components/pages/inputs/ockam_kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ input:
topics: [] # No default (required)
regexp_topics: false
rack_id: ""
instance_id: ""
start_from_oldest: true
fetch_max_bytes: 50MiB
fetch_max_wait: 5s
Expand Down Expand Up @@ -347,6 +348,15 @@ Whether listed topics should be interpreted as regular expression patterns for m
A rack specifies where the client is physically located and changes fetch requests to consume from the closest replica as opposed to the leader replica.
*Type*: `string`
*Default*: `""`
=== `kafka.instance_id`
When using a consumer group, an instance ID specifies the groups static membership, which can prevent rebalances during reconnects. When using a instance ID the client does NOT leave the group when closing. To actually leave the group one must use an external admin command to leave the group on behalf of this instance ID. This ID must be unique per consumer within the group.
*Type*: `string`
*Default*: `""`
Expand Down
10 changes: 10 additions & 0 deletions docs/modules/components/pages/inputs/redpanda.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ input:
topics: [] # No default (required)
regexp_topics: false
rack_id: ""
instance_id: ""
start_from_oldest: true
fetch_max_bytes: 50MiB
fetch_max_wait: 5s
Expand Down Expand Up @@ -576,6 +577,15 @@ Whether listed topics should be interpreted as regular expression patterns for m
A rack specifies where the client is physically located and changes fetch requests to consume from the closest replica as opposed to the leader replica.
*Type*: `string`
*Default*: `""`
=== `instance_id`
When using a consumer group, an instance ID specifies the groups static membership, which can prevent rebalances during reconnects. When using a instance ID the client does NOT leave the group when closing. To actually leave the group one must use an external admin command to leave the group on behalf of this instance ID. This ID must be unique per consumer within the group.
*Type*: `string`
*Default*: `""`
Expand Down
10 changes: 10 additions & 0 deletions docs/modules/components/pages/inputs/redpanda_common.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ input:
topics: [] # No default (required)
regexp_topics: false
rack_id: ""
instance_id: ""
start_from_oldest: true
fetch_max_bytes: 50MiB
fetch_max_wait: 5s
Expand Down Expand Up @@ -176,6 +177,15 @@ Whether listed topics should be interpreted as regular expression patterns for m
A rack specifies where the client is physically located and changes fetch requests to consume from the closest replica as opposed to the leader replica.
*Type*: `string`
*Default*: `""`
=== `instance_id`
When using a consumer group, an instance ID specifies the groups static membership, which can prevent rebalances during reconnects. When using a instance ID the client does NOT leave the group when closing. To actually leave the group one must use an external admin command to leave the group on behalf of this instance ID. This ID must be unique per consumer within the group.
*Type*: `string`
*Default*: `""`
Expand Down
10 changes: 10 additions & 0 deletions docs/modules/components/pages/inputs/redpanda_migrator.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ input:
topics: [] # No default (required)
regexp_topics: false
rack_id: ""
instance_id: ""
start_from_oldest: true
fetch_max_bytes: 50MiB
fetch_max_wait: 5s
Expand Down Expand Up @@ -553,6 +554,15 @@ Whether listed topics should be interpreted as regular expression patterns for m
A rack specifies where the client is physically located and changes fetch requests to consume from the closest replica as opposed to the leader replica.
*Type*: `string`
*Default*: `""`
=== `instance_id`
When using a consumer group, an instance ID specifies the groups static membership, which can prevent rebalances during reconnects. When using a instance ID the client does NOT leave the group when closing. To actually leave the group one must use an external admin command to leave the group on behalf of this instance ID. This ID must be unique per consumer within the group.
*Type*: `string`
*Default*: `""`
Expand Down
13 changes: 13 additions & 0 deletions internal/impl/kafka/franz_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func BytesFromStrFieldAsInt32(name string, pConf *service.ParsedConfig) (int32,

const (
// Consumer fields
kfrFieldInstanceID = "instance_id"
kfrFieldRackID = "rack_id"
kfrFieldTopics = "topics"
kfrFieldRegexpTopics = "regexp_topics"
Expand Down Expand Up @@ -82,6 +83,10 @@ Finally, it's also possible to specify an explicit offset to consume from by add
Description("A rack specifies where the client is physically located and changes fetch requests to consume from the closest replica as opposed to the leader replica.").
Default("").
Advanced(),
service.NewStringField(kfrFieldInstanceID).
Description("When using a consumer group, an instance ID specifies the groups static membership, which can prevent rebalances during reconnects. When using a instance ID the client does NOT leave the group when closing. To actually leave the group one must use an external admin command to leave the group on behalf of this instance ID. This ID must be unique per consumer within the group.").
Default("").
Advanced(),
service.NewBoolField(kfrFieldStartFromOldest).
Description("Determines whether to consume from the oldest available offset, otherwise messages are consumed from the latest offset. The setting is applied when creating a new consumer group or the saved offset no longer exists.").
Default(true).
Expand Down Expand Up @@ -109,6 +114,7 @@ Finally, it's also possible to specify an explicit offset to consume from by add
// consumer.
type FranzConsumerDetails struct {
RackID string
InstanceID string
InitialOffset kgo.Offset
Topics []string
TopicPartitions map[string]map[int32]kgo.Offset
Expand All @@ -128,6 +134,9 @@ func FranzConsumerDetailsFromConfig(conf *service.ParsedConfig) (*FranzConsumerD
if d.RackID, err = conf.FieldString(kfrFieldRackID); err != nil {
return nil, err
}
if d.InstanceID, err = conf.FieldString(kfrFieldInstanceID); err != nil {
return nil, err
}

startFromOldest, err := conf.FieldBool(kfrFieldStartFromOldest)
if err != nil {
Expand Down Expand Up @@ -204,6 +213,10 @@ func (d *FranzConsumerDetails) FranzOpts() []kgo.Opt {
opts = append(opts, kgo.ConsumeRegex())
}

if d.InstanceID != "" {
opts = append(opts, kgo.InstanceID(d.InstanceID))
}

return opts
}

Expand Down
10 changes: 10 additions & 0 deletions internal/impl/kafka/input_sarama_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const (
iskFieldTLS = "tls"
iskFieldConsumerGroup = "consumer_group"
iskFieldClientID = "client_id"
iskFieldInstanceID = "instance_id"
iskFieldRackID = "rack_id"
iskFieldStartFromOldest = "start_from_oldest"
iskFieldCheckpointLimit = "checkpoint_limit"
Expand Down Expand Up @@ -122,6 +123,10 @@ Unfortunately this error message will appear for a wide range of connection prob
service.NewStringField(iskFieldClientID).
Description("An identifier for the client connection.").
Advanced().Default("benthos"),
service.NewStringField(iskFieldInstanceID).
Description("When using consumer groups, an identifier for this specific input so that it can be identified over restarts of this process. This should be unique per input.").
Advanced().
Optional(),
service.NewStringField(iskFieldRackID).
Description("A rack identifier for this client.").
Advanced().Default(""),
Expand Down Expand Up @@ -460,6 +465,11 @@ func (k *kafkaReader) saramaConfigFromParsed(conf *service.ParsedConfig) (*saram
if config.ClientID, err = conf.FieldString(iskFieldClientID); err != nil {
return nil, err
}
if conf.Contains(iskFieldInstanceID) {
if config.Consumer.Group.InstanceId, err = conf.FieldString(iskFieldInstanceID); err != nil {
return nil, err
}
}

if config.RackID, err = conf.FieldString(iskFieldRackID); err != nil {
return nil, err
Expand Down

0 comments on commit da85d74

Please sign in to comment.