Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions banjax-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,9 @@ sha_inv_path_exceptions:
- /no_challenge
# enable pprof for debugging
profile: false
# deflect dnet the banjax instance is currently in
dnet: dnext1
# deflect dnet mapping to kafka partision
dnet_to_partition:
dnext1: 0
dnext2: 1
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ require (
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/gin-gonic/gin v1.10.0
github.com/go-playground/validator/v10 v10.23.0 // indirect
github.com/google/uuid v1.6.0
github.com/hpcloud/tail v1.0.0
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.9 // indirect
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ github.com/gonetx/ipset v0.1.0/go.mod h1:AwNAf1Vtqg0cJ4bha4w1ROX5cO/8T50UYoegxM2
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/jeremy5189/ipfilter-no-iploc/v2 v2.0.3 h1:hy1vKJXU9bU99b1bVvj+kIt8afXOsHrWgcV+6JQx0CE=
Expand Down
2 changes: 2 additions & 0 deletions internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ type Config struct {
SessionCookieNotVerify bool `yaml:"session_cookie_not_verify"`
SitesToDisableBaskerville map[string]bool `yaml:"sites_to_disable_baskerville"`
SitesToShaInvPathExceptions map[string][]string `yaml:"sha_inv_path_exceptions"`
DNet string `yaml:"dnet"`
DNetToPartition map[string]int `yaml:"dnet_to_partition"`
}

type RegexWithRate struct {
Expand Down
42 changes: 31 additions & 11 deletions internal/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"sync"
"time"

"github.com/google/uuid"
"github.com/segmentio/kafka-go"
)

Expand All @@ -42,6 +41,17 @@ type commandMessage struct {
Host string `json:"host"`
SessionId string `json:"session_id"`
Source string `json:"source"`
PrintLog bool `json:"print_log"`
}

func getDNetPartition(config *Config) int {
if partition, ok := config.DNetToPartition[config.DNet]; ok {
log.Printf("KAFKA: init using dnet %s maping to partition %d\n", config.DNet, partition)
return partition
} else {
log.Printf("KAFKA: dnet %s not found in dnet_to_partition mapping, using partition 0\n", config.DNet)
}
return 0
}

func getDialer(config *Config) *kafka.Dialer {
Expand Down Expand Up @@ -89,15 +99,15 @@ func RunKafkaReader(
config := configHolder.Get()
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: config.KafkaBrokers,
GroupID: uuid.New().String(),
StartOffset: kafka.LastOffset,
Partition: getDNetPartition(config),
Topic: config.KafkaCommandTopic,
Dialer: getDialer(config),
CommitInterval: time.Second * 10,
})
defer r.Close()

log.Printf("KAFKA: NewReader started")
log.Printf("KAFKA: NewReader started, log supressed unless debug or command.PrintLog = True")

for {
m, err := r.ReadMessage(ctx)
Expand All @@ -120,8 +130,10 @@ func RunKafkaReader(
continue
}

log.Printf("KAFKA: message %s (%d/%d) = N: %s, V: %s, S: %s: Src: %s\n",
string(m.Key), m.Offset, m.Partition, command.Name, command.Value, command.SessionId, command.Source)
if config.Debug || command.PrintLog {
log.Printf("KAFKA: message %s (%d/%d) = N: %s, V: %s, S: %s: Src: %s\n",
string(m.Key), m.Offset, m.Partition, command.Name, command.Value, command.SessionId, command.Source)
}

handleCommand(
configHolder.Get(),
Expand All @@ -132,8 +144,10 @@ func RunKafkaReader(

select {
case <-ctx.Done():
log.Println("KAFKA: context done, exiting kafka reader")
return
case <-time.After(5 * time.Second):
log.Println("KAFKA: reconnecting to kafka reader")
continue
}
}
Expand Down Expand Up @@ -163,7 +177,7 @@ func handleCommand(
decisionLists *DynamicDecisionLists,
) {
// exempt a site from baskerville according to config
if _, disabled := config.SitesToDisableBaskerville[command.Host]; disabled {
if _, disabled := config.SitesToDisableBaskerville[command.Host]; disabled && config.Debug {
log.Printf("KAFKA: %s disabled baskerville, skipping %s\n", command.Host, command.Name)
return
}
Expand All @@ -185,7 +199,9 @@ func handleCommand(
handleSessionCommand(config, command, decisionLists, NginxBlock, ttl)
break
default:
log.Printf("KAFKA: unrecognized command name: %s\n", command.Name)
if config.Debug {
log.Printf("KAFKA: unrecognized command name: %s\n", command.Name)
}
}
}

Expand All @@ -201,8 +217,10 @@ func handleIPCommand(
return
}

log.Printf("KAFKA: handleIPCommand %s %s %s %d\n",
command.Host, command.Value, decision, expireDuration)
if config.Debug {
log.Printf("KAFKA: handleIPCommand %s %s %s %d\n",
command.Host, command.Value, decision, expireDuration)
}

decisionLists.Update(
config,
Expand All @@ -228,8 +246,10 @@ func handleSessionCommand(
return
}

log.Printf("KAFKA: handleSessionCommand %s %s %s %s %d\n",
command.Host, command.Value, sessionIdDecoded, decision, expireDuration)
if config.Debug {
log.Printf("KAFKA: handleSessionCommand %s %s %s %s %d\n",
command.Host, command.Value, sessionIdDecoded, decision, expireDuration)
}

decisionLists.UpdateBySessionId(
config,
Expand Down