Skip to content

Commit

Permalink
Merge pull request #3134 from redpanda-data/consumer_group
Browse files Browse the repository at this point in the history
kafka franz: support consumer group timeout configs
  • Loading branch information
rockwotj authored Jan 21, 2025
2 parents 2752d1b + 16233f2 commit 48493f6
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 0 deletions.
30 changes: 30 additions & 0 deletions docs/modules/components/pages/inputs/kafka_franz.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ input:
regexp_topics: false
rack_id: ""
instance_id: ""
rebalance_timeout: 45s
session_timeout: 1m
heartbeat_interval: 3s
start_from_oldest: true
fetch_max_bytes: 50MiB
fetch_max_wait: 5s
Expand Down Expand Up @@ -564,6 +567,33 @@ When using a consumer group, an instance ID specifies the groups static membersh
*Default*: `""`
=== `rebalance_timeout`
When using a consumer group, `rebalance_timeout` sets how long group members are allowed to take when a rebalance has begun. This timeout is how long all members are allowed to complete work and commit offsets, minus the time it took to detect the rebalance (from a heartbeat).
*Type*: `string`
*Default*: `"45s"`
=== `session_timeout`
When using a consumer group, `session_timeout` sets how long a member in hte group can go between heartbeats. If a member does not heartbeat in this timeout, the broker will remove the member from the group and initiate a rebalance.
*Type*: `string`
*Default*: `"1m"`
=== `heartbeat_interval`
When using a consumer group, `heartbeat_interval` sets how long a group member goes between heartbeats to Kafka. Kafka uses heartbeats to ensure that a group member's sesion stays active. This value should be no higher than 1/3rd of the `session_timeout`. This is equivalent to the Java heartbeat.interval.ms setting.
*Type*: `string`
*Default*: `"3s"`
=== `start_from_oldest`
Determines whether to consume from the oldest available offset, otherwise messages are consumed from the latest offset. The setting is applied when creating a new consumer group or the saved offset no longer exists.
Expand Down
30 changes: 30 additions & 0 deletions docs/modules/components/pages/inputs/ockam_kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ input:
regexp_topics: false
rack_id: ""
instance_id: ""
rebalance_timeout: 45s
session_timeout: 1m
heartbeat_interval: 3s
start_from_oldest: true
fetch_max_bytes: 50MiB
fetch_max_wait: 5s
Expand Down Expand Up @@ -361,6 +364,33 @@ When using a consumer group, an instance ID specifies the groups static membersh
*Default*: `""`
=== `kafka.rebalance_timeout`
When using a consumer group, `rebalance_timeout` sets how long group members are allowed to take when a rebalance has begun. This timeout is how long all members are allowed to complete work and commit offsets, minus the time it took to detect the rebalance (from a heartbeat).
*Type*: `string`
*Default*: `"45s"`
=== `kafka.session_timeout`
When using a consumer group, `session_timeout` sets how long a member in hte group can go between heartbeats. If a member does not heartbeat in this timeout, the broker will remove the member from the group and initiate a rebalance.
*Type*: `string`
*Default*: `"1m"`
=== `kafka.heartbeat_interval`
When using a consumer group, `heartbeat_interval` sets how long a group member goes between heartbeats to Kafka. Kafka uses heartbeats to ensure that a group member's sesion stays active. This value should be no higher than 1/3rd of the `session_timeout`. This is equivalent to the Java heartbeat.interval.ms setting.
*Type*: `string`
*Default*: `"3s"`
=== `kafka.start_from_oldest`
Determines whether to consume from the oldest available offset, otherwise messages are consumed from the latest offset. The setting is applied when creating a new consumer group or the saved offset no longer exists.
Expand Down
30 changes: 30 additions & 0 deletions docs/modules/components/pages/inputs/redpanda.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ input:
regexp_topics: false
rack_id: ""
instance_id: ""
rebalance_timeout: 45s
session_timeout: 1m
heartbeat_interval: 3s
start_from_oldest: true
fetch_max_bytes: 50MiB
fetch_max_wait: 5s
Expand Down Expand Up @@ -590,6 +593,33 @@ When using a consumer group, an instance ID specifies the groups static membersh
*Default*: `""`
=== `rebalance_timeout`
When using a consumer group, `rebalance_timeout` sets how long group members are allowed to take when a rebalance has begun. This timeout is how long all members are allowed to complete work and commit offsets, minus the time it took to detect the rebalance (from a heartbeat).
*Type*: `string`
*Default*: `"45s"`
=== `session_timeout`
When using a consumer group, `session_timeout` sets how long a member in hte group can go between heartbeats. If a member does not heartbeat in this timeout, the broker will remove the member from the group and initiate a rebalance.
*Type*: `string`
*Default*: `"1m"`
=== `heartbeat_interval`
When using a consumer group, `heartbeat_interval` sets how long a group member goes between heartbeats to Kafka. Kafka uses heartbeats to ensure that a group member's sesion stays active. This value should be no higher than 1/3rd of the `session_timeout`. This is equivalent to the Java heartbeat.interval.ms setting.
*Type*: `string`
*Default*: `"3s"`
=== `start_from_oldest`
Determines whether to consume from the oldest available offset, otherwise messages are consumed from the latest offset. The setting is applied when creating a new consumer group or the saved offset no longer exists.
Expand Down
30 changes: 30 additions & 0 deletions docs/modules/components/pages/inputs/redpanda_common.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ input:
regexp_topics: false
rack_id: ""
instance_id: ""
rebalance_timeout: 45s
session_timeout: 1m
heartbeat_interval: 3s
start_from_oldest: true
fetch_max_bytes: 50MiB
fetch_max_wait: 5s
Expand Down Expand Up @@ -190,6 +193,33 @@ When using a consumer group, an instance ID specifies the groups static membersh
*Default*: `""`
=== `rebalance_timeout`
When using a consumer group, `rebalance_timeout` sets how long group members are allowed to take when a rebalance has begun. This timeout is how long all members are allowed to complete work and commit offsets, minus the time it took to detect the rebalance (from a heartbeat).
*Type*: `string`
*Default*: `"45s"`
=== `session_timeout`
When using a consumer group, `session_timeout` sets how long a member in hte group can go between heartbeats. If a member does not heartbeat in this timeout, the broker will remove the member from the group and initiate a rebalance.
*Type*: `string`
*Default*: `"1m"`
=== `heartbeat_interval`
When using a consumer group, `heartbeat_interval` sets how long a group member goes between heartbeats to Kafka. Kafka uses heartbeats to ensure that a group member's sesion stays active. This value should be no higher than 1/3rd of the `session_timeout`. This is equivalent to the Java heartbeat.interval.ms setting.
*Type*: `string`
*Default*: `"3s"`
=== `start_from_oldest`
Determines whether to consume from the oldest available offset, otherwise messages are consumed from the latest offset. The setting is applied when creating a new consumer group or the saved offset no longer exists.
Expand Down
30 changes: 30 additions & 0 deletions docs/modules/components/pages/inputs/redpanda_migrator.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ input:
regexp_topics: false
rack_id: ""
instance_id: ""
rebalance_timeout: 45s
session_timeout: 1m
heartbeat_interval: 3s
start_from_oldest: true
fetch_max_bytes: 50MiB
fetch_max_wait: 5s
Expand Down Expand Up @@ -567,6 +570,33 @@ When using a consumer group, an instance ID specifies the groups static membersh
*Default*: `""`
=== `rebalance_timeout`
When using a consumer group, `rebalance_timeout` sets how long group members are allowed to take when a rebalance has begun. This timeout is how long all members are allowed to complete work and commit offsets, minus the time it took to detect the rebalance (from a heartbeat).
*Type*: `string`
*Default*: `"45s"`
=== `session_timeout`
When using a consumer group, `session_timeout` sets how long a member in hte group can go between heartbeats. If a member does not heartbeat in this timeout, the broker will remove the member from the group and initiate a rebalance.
*Type*: `string`
*Default*: `"1m"`
=== `heartbeat_interval`
When using a consumer group, `heartbeat_interval` sets how long a group member goes between heartbeats to Kafka. Kafka uses heartbeats to ensure that a group member's sesion stays active. This value should be no higher than 1/3rd of the `session_timeout`. This is equivalent to the Java heartbeat.interval.ms setting.
*Type*: `string`
*Default*: `"3s"`
=== `start_from_oldest`
Determines whether to consume from the oldest available offset, otherwise messages are consumed from the latest offset. The setting is applied when creating a new consumer group or the saved offset no longer exists.
Expand Down
30 changes: 30 additions & 0 deletions internal/impl/kafka/franz_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ const (
kfrFieldFetchMinBytes = "fetch_min_bytes"
kfrFieldFetchMaxPartitionBytes = "fetch_max_partition_bytes"
kfrFieldFetchMaxWait = "fetch_max_wait"
kfrFieldSessionTimeout = "session_timeout"
kfrFieldRebalanceTimeout = "rebalance_timeout"
kfrFieldHeartbeatInterval = "heartbeat_interval"
)

// FranzConsumerFields returns a slice of fields specifically for customising
Expand Down Expand Up @@ -87,6 +90,18 @@ Finally, it's also possible to specify an explicit offset to consume from by add
Description("When using a consumer group, an instance ID specifies the groups static membership, which can prevent rebalances during reconnects. When using a instance ID the client does NOT leave the group when closing. To actually leave the group one must use an external admin command to leave the group on behalf of this instance ID. This ID must be unique per consumer within the group.").
Default("").
Advanced(),
service.NewDurationField(kfrFieldRebalanceTimeout).
Description("When using a consumer group, `rebalance_timeout` sets how long group members are allowed to take when a rebalance has begun. This timeout is how long all members are allowed to complete work and commit offsets, minus the time it took to detect the rebalance (from a heartbeat).").
Default("45s").
Advanced(),
service.NewDurationField(kfrFieldSessionTimeout).
Description("When using a consumer group, `session_timeout` sets how long a member in hte group can go between heartbeats. If a member does not heartbeat in this timeout, the broker will remove the member from the group and initiate a rebalance.").
Default("1m").
Advanced(),
service.NewDurationField(kfrFieldHeartbeatInterval).
Description("When using a consumer group, `heartbeat_interval` sets how long a group member goes between heartbeats to Kafka. Kafka uses heartbeats to ensure that a group member's sesion stays active. This value should be no higher than 1/3rd of the `session_timeout`. This is equivalent to the Java heartbeat.interval.ms setting.").
Default("3s").
Advanced(),
service.NewBoolField(kfrFieldStartFromOldest).
Description("Determines whether to consume from the oldest available offset, otherwise messages are consumed from the latest offset. The setting is applied when creating a new consumer group or the saved offset no longer exists.").
Default(true).
Expand Down Expand Up @@ -115,6 +130,9 @@ Finally, it's also possible to specify an explicit offset to consume from by add
type FranzConsumerDetails struct {
RackID string
InstanceID string
SessionTimeout time.Duration
RebalanceTimeout time.Duration
HeartbeatInterval time.Duration
InitialOffset kgo.Offset
Topics []string
TopicPartitions map[string]map[int32]kgo.Offset
Expand All @@ -137,6 +155,15 @@ func FranzConsumerDetailsFromConfig(conf *service.ParsedConfig) (*FranzConsumerD
if d.InstanceID, err = conf.FieldString(kfrFieldInstanceID); err != nil {
return nil, err
}
if d.SessionTimeout, err = conf.FieldDuration(kfrFieldSessionTimeout); err != nil {
return nil, err
}
if d.RebalanceTimeout, err = conf.FieldDuration(kfrFieldRebalanceTimeout); err != nil {
return nil, err
}
if d.HeartbeatInterval, err = conf.FieldDuration(kfrFieldHeartbeatInterval); err != nil {
return nil, err
}

startFromOldest, err := conf.FieldBool(kfrFieldStartFromOldest)
if err != nil {
Expand Down Expand Up @@ -207,6 +234,9 @@ func (d *FranzConsumerDetails) FranzOpts() []kgo.Opt {
kgo.FetchMinBytes(d.FetchMinBytes),
kgo.FetchMaxPartitionBytes(d.FetchMaxPartitionBytes),
kgo.FetchMaxWait(d.FetchMaxWait),
kgo.SessionTimeout(d.SessionTimeout),
kgo.RebalanceTimeout(d.RebalanceTimeout),
kgo.HeartbeatInterval(d.HeartbeatInterval),
}

if d.RegexPattern {
Expand Down

0 comments on commit 48493f6

Please sign in to comment.