Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions banjax-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ kafka_ssl_key: "/etc/banjax/key.pem"
kafka_ssl_key_password: password
kafka_report_topic: 'banjax_report_topic'
kafka_command_topic: 'banjax_command_topic'
kafka_min_bytes: 1e6 # 1MB
kafka_max_bytes: 1e7 # 10MB
kafka_max_wait_ms: 500
kafka_dialer_timeout_seconds: 20
kafka_dialer_keep_alive_seconds: 120
password_protected_paths:
"localhost:8081":
- wp-admin
Expand Down
5 changes: 5 additions & 0 deletions internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ type Config struct {
KafkaSslKeyPassword string `yaml:"kafka_ssl_key_password"`
KafkaCommandTopic string `yaml:"kafka_command_topic"`
KafkaReportTopic string `yaml:"kafka_report_topic"`
KafkaMinBytes int `yaml:"kafka_min_bytes"`
KafkaMaxBytes int `yaml:"kafka_max_bytes"`
KafkaMaxWaitMs int `yaml:"kafka_max_wait_ms"`
KafkaDialerTimeoutSeconds int `yaml:"kafka_dialer_timeout_seconds"`
KafkaDialerKeepAliveSeconds int `yaml:"kafka_dialer_keep_alive_seconds"`
PerSiteDecisionLists map[string]map[string][]string `yaml:"per_site_decision_lists"`
GlobalDecisionLists map[string][]string `yaml:"global_decision_lists"`
ConfigVersion string `yaml:"config_version"`
Expand Down
16 changes: 14 additions & 2 deletions internal/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ func getDialer(config *Config) *kafka.Dialer {
}

dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
Timeout: time.Duration(config.KafkaDialerTimeoutSeconds) * time.Second,
KeepAlive: time.Duration(config.KafkaDialerKeepAliveSeconds) * time.Second,
DualStack: true,
TLS: tlsConfig,
}
Expand All @@ -97,13 +98,24 @@ func RunKafkaReader(
// XXX this infinite loop is so we reconnect if we get dropped.
for {
config := configHolder.Get()

log.Print("Kafka: starting NewReader with config",
" MinBytes: ", config.KafkaMinBytes,
" MaxBytes: ", config.KafkaMaxBytes,
" MaxWaitMs: ", config.KafkaMaxWaitMs,
" Dialer TimeoutSeconds: ", config.KafkaDialerTimeoutSeconds,
" KeepAliveSeconds: ", config.KafkaDialerKeepAliveSeconds,
)

r := kafka.NewReader(kafka.ReaderConfig{
Brokers: config.KafkaBrokers,
StartOffset: kafka.LastOffset,
Partition: getDNetPartition(config),
Topic: config.KafkaCommandTopic,
Dialer: getDialer(config),
CommitInterval: time.Second * 10,
MinBytes: config.KafkaMinBytes, // 1MB - batch
MaxBytes: config.KafkaMaxBytes, // 10MB max fetch size
MaxWait: time.Duration(config.KafkaMaxWaitMs) * time.Millisecond, // 500ms max wait
})
defer r.Close()

Expand Down