Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions banjax-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ kafka_ssl_key: "/etc/banjax/key.pem"
kafka_ssl_key_password: password
kafka_report_topic: 'banjax_report_topic'
kafka_command_topic: 'banjax_command_topic'
kafka_min_bytes: 1000000 # 1MB
kafka_max_bytes: 10000000 # 10MB
kafka_max_wait_ms: 500
kafka_dialer_timeout_seconds: 20
kafka_dialer_keep_alive_seconds: 120
password_protected_paths:
"localhost:8081":
- wp-admin
Expand Down
5 changes: 5 additions & 0 deletions internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ type Config struct {
KafkaSslKeyPassword string `yaml:"kafka_ssl_key_password"`
KafkaCommandTopic string `yaml:"kafka_command_topic"`
KafkaReportTopic string `yaml:"kafka_report_topic"`
KafkaMinBytes int `yaml:"kafka_min_bytes"`
KafkaMaxBytes int `yaml:"kafka_max_bytes"`
KafkaMaxWaitMs int `yaml:"kafka_max_wait_ms"`
KafkaDialerTimeoutSeconds int `yaml:"kafka_dialer_timeout_seconds"`
KafkaDialerKeepAliveSeconds int `yaml:"kafka_dialer_keep_alive_seconds"`
PerSiteDecisionLists map[string]map[string][]string `yaml:"per_site_decision_lists"`
GlobalDecisionLists map[string][]string `yaml:"global_decision_lists"`
ConfigVersion string `yaml:"config_version"`
Expand Down
18 changes: 16 additions & 2 deletions internal/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ func getDialer(config *Config) *kafka.Dialer {
}

dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
Timeout: time.Duration(config.KafkaDialerTimeoutSeconds) * time.Second,
KeepAlive: time.Duration(config.KafkaDialerKeepAliveSeconds) * time.Second,
DualStack: true,
TLS: tlsConfig,
}
Expand All @@ -97,13 +98,26 @@ func RunKafkaReader(
// XXX this infinite loop is so we reconnect if we get dropped.
for {
config := configHolder.Get()

log.Printf(
"KAFKA: starting NewReader with config MinBytes: %d MaxBytes: %d MaxWaitMs: %d "+
"Dialer TimeoutSeconds: %d KeepAliveSeconds: %d",
config.KafkaMinBytes,
config.KafkaMaxBytes,
config.KafkaMaxWaitMs,
config.KafkaDialerTimeoutSeconds,
config.KafkaDialerKeepAliveSeconds,
)

r := kafka.NewReader(kafka.ReaderConfig{
Brokers: config.KafkaBrokers,
StartOffset: kafka.LastOffset,
Partition: getDNetPartition(config),
Topic: config.KafkaCommandTopic,
Dialer: getDialer(config),
CommitInterval: time.Second * 10,
MinBytes: config.KafkaMinBytes, // 1MB - batch
MaxBytes: config.KafkaMaxBytes, // 10MB max fetch size
MaxWait: time.Duration(config.KafkaMaxWaitMs) * time.Millisecond, // 500ms max wait
})
defer r.Close()

Expand Down