Skip to content

Commit

Permalink
Merge pull request #2824 from rockwotj/maxwrite-franz
Browse files Browse the repository at this point in the history
  • Loading branch information
rockwotj authored Aug 30, 2024
2 parents d49c1cd + e4cd9ee commit 4526abd
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 19 deletions.
18 changes: 18 additions & 0 deletions docs/modules/components/pages/outputs/kafka_franz.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ output:
check: ""
processors: [] # No default (optional)
max_message_bytes: 1MB
broker_write_max_bytes: 100MB
compression: "" # No default (optional)
tls:
enabled: false
Expand Down Expand Up @@ -397,6 +398,23 @@ max_message_bytes: 100MB
max_message_bytes: 50mib
```
=== `broker_write_max_bytes`
The upper bound for the number of bytes written to a broker connection in a single write. This field corresponds to Kafka's `socket.request.max.bytes`.
*Type*: `string`
*Default*: `"100MB"`
```yml
# Examples
broker_write_max_bytes: 128MB
broker_write_max_bytes: 50mib
```
=== `compression`
Optionally set an explicit compression type. The default preference is to use snappy when the broker supports it, and fall back to none if not.
Expand Down
18 changes: 18 additions & 0 deletions docs/modules/components/pages/outputs/ockam_kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ output:
check: ""
processors: [] # No default (optional)
max_message_bytes: 1MB
broker_write_max_bytes: 100MB
compression: "" # No default (optional)
tls:
enabled: false
Expand Down Expand Up @@ -416,6 +417,23 @@ max_message_bytes: 100MB
max_message_bytes: 50mib
```
=== `kafka.broker_write_max_bytes`
The upper bound for the number of bytes written to a broker connection in a single write. This field corresponds to Kafka's `socket.request.max.bytes`.
*Type*: `string`
*Default*: `"100MB"`
```yml
# Examples
broker_write_max_bytes: 128MB
broker_write_max_bytes: 50mib
```
=== `kafka.compression`
Optionally set an explicit compression type. The default preference is to use snappy when the broker supports it, and fall back to none if not.
Expand Down
58 changes: 39 additions & 19 deletions internal/impl/kafka/output_kafka_franz.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ func FranzKafkaOutputConfigFields() []*service.ConfigField {
Default("1MB").
Example("100MB").
Example("50mib"),
service.NewStringField("broker_write_max_bytes").
Description("The upper bound for the number of bytes written to a broker connection in a single write. This field corresponds to Kafka's `socket.request.max.bytes`.").
Advanced().
Default("100MB").
Example("128MB").
Example("50mib"),
service.NewStringEnumField("compression", "lz4", "snappy", "gzip", "none", "zstd").
Description("Optionally set an explicit compression type. The default preference is to use snappy when the broker supports it, and fall back to none if not.").
Optional().
Expand Down Expand Up @@ -147,21 +153,22 @@ func init() {

// FranzKafkaWriter implements a kafka writer using the franz-go library.
type FranzKafkaWriter struct {
SeedBrokers []string
topic *service.InterpolatedString
key *service.InterpolatedString
partition *service.InterpolatedString
timestamp *service.InterpolatedString
clientID string
rackID string
idempotentWrite bool
TLSConf *tls.Config
saslConfs []sasl.Mechanism
metaFilter *service.MetadataFilter
partitioner kgo.Partitioner
timeout time.Duration
produceMaxBytes int32
compressionPrefs []kgo.CompressionCodec
SeedBrokers []string
topic *service.InterpolatedString
key *service.InterpolatedString
partition *service.InterpolatedString
timestamp *service.InterpolatedString
clientID string
rackID string
idempotentWrite bool
TLSConf *tls.Config
saslConfs []sasl.Mechanism
metaFilter *service.MetadataFilter
partitioner kgo.Partitioner
timeout time.Duration
produceMaxBytes int32
brokerWriteMaxBytes int32
compressionPrefs []kgo.CompressionCodec

client *kgo.Client

Expand Down Expand Up @@ -205,18 +212,30 @@ func NewFranzKafkaWriterFromConfig(conf *service.ParsedConfig, log *service.Logg
return nil, err
}

maxBytesStr, err := conf.FieldString("max_message_bytes")
maxMessageBytesStr, err := conf.FieldString("max_message_bytes")
if err != nil {
return nil, err
}
maxBytes, err := humanize.ParseBytes(maxBytesStr)
maxMessageBytes, err := humanize.ParseBytes(maxMessageBytesStr)
if err != nil {
return nil, fmt.Errorf("failed to parse max_message_bytes: %w", err)
}
if maxBytes > uint64(math.MaxInt32) {
if maxMessageBytes > uint64(math.MaxInt32) {
return nil, fmt.Errorf("invalid max_message_bytes, must not exceed %v", math.MaxInt32)
}
f.produceMaxBytes = int32(maxBytes)
f.produceMaxBytes = int32(maxMessageBytes)
brokerWriteMaxBytesStr, err := conf.FieldString("broker_write_max_bytes")
if err != nil {
return nil, err
}
brokerWriteMaxBytes, err := humanize.ParseBytes(brokerWriteMaxBytesStr)
if err != nil {
return nil, fmt.Errorf("failed to parse broker_write_max_bytes: %w", err)
}
if brokerWriteMaxBytes > 1<<30 {
return nil, fmt.Errorf("invalid broker_write_max_bytes, must not exceed %v", 1<<30)
}
f.brokerWriteMaxBytes = int32(brokerWriteMaxBytes)

if conf.Contains("compression") {
cStr, err := conf.FieldString("compression")
Expand Down Expand Up @@ -313,6 +332,7 @@ func (f *FranzKafkaWriter) Connect(ctx context.Context) error {
kgo.SASL(f.saslConfs...),
kgo.AllowAutoTopicCreation(), // TODO: Configure this
kgo.ProducerBatchMaxBytes(f.produceMaxBytes),
kgo.BrokerMaxWriteBytes(f.brokerWriteMaxBytes),
kgo.ProduceRequestTimeout(f.timeout),
kgo.ClientID(f.clientID),
kgo.Rack(f.rackID),
Expand Down

0 comments on commit 4526abd

Please sign in to comment.