Skip to content
This repository was archived by the owner on Feb 19, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/cnetflow/cnetflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var (

Addr = flag.String("addr", "", "NetFlow/IPFIX listening address")
Port = flag.Int("port", 2055, "NetFlow/IPFIX listening port")
Reuse = flag.Bool("reuse", false, "Enable so_reuseport for NetFlow/IPFIX listening port")
Reuse = flag.Int("reuse", 0, "Enable so_reuseport for NetFlow/IPFIX listening port")

Workers = flag.Int("workers", 1, "Number of NetFlow workers")
LogLevel = flag.String("loglevel", "info", "Log level")
Expand Down Expand Up @@ -86,7 +86,7 @@ func main() {
}
log.WithFields(log.Fields{
"Type": "NetFlow"}).
Infof("Listening on UDP %v:%v", *Addr, *Port)
Infof("Listening on UDP %s:%d (reuse: %d, workers: %d)", *Addr, *Port, *Reuse, *Workers)

err := s.FlowRoutine(*Workers, *Addr, *Port, *Reuse)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions cmd/cnflegacy/cnflegacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var (

Addr = flag.String("addr", "", "NetFlow v5 listening address")
Port = flag.Int("port", 2055, "NetFlow v5 listening port")
Reuse = flag.Bool("reuse", false, "Enable so_reuseport for NetFlow v5 listening port")
Reuse = flag.Int("reuse", 0, "Enable so_reuseport for NetFlow v5 listening port")

Workers = flag.Int("workers", 1, "Number of NetFlow v5 workers")
LogLevel = flag.String("loglevel", "info", "Log level")
Expand Down Expand Up @@ -84,7 +84,7 @@ func main() {
}
log.WithFields(log.Fields{
"Type": "NetFlowLegacy"}).
Infof("Listening on UDP %v:%v", *Addr, *Port)
Infof("Listening on UDP %s:%d (reuse: %d, workers: %d)", *Addr, *Port, *Reuse, *Workers)

err := s.FlowRoutine(*Workers, *Addr, *Port, *Reuse)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions cmd/csflow/csflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var (

Addr = flag.String("addr", "", "sFlow listening address")
Port = flag.Int("port", 6343, "sFlow listening port")
Reuse = flag.Bool("reuse", false, "Enable so_reuseport for sFlow listening port")
Reuse = flag.Int("reuse", 0, "Enable so_reuseport for sFlow listening port")

Workers = flag.Int("workers", 1, "Number of sFlow workers")
LogLevel = flag.String("loglevel", "info", "Log level")
Expand Down Expand Up @@ -84,7 +84,7 @@ func main() {
}
log.WithFields(log.Fields{
"Type": "sFlow"}).
Infof("Listening on UDP %v:%v", *Addr, *Port)
Infof("Listening on UDP %s:%d (reuse: %d, workers: %d)", *Addr, *Port, *Reuse, *Workers)

err := s.FlowRoutine(*Workers, *Addr, *Port, *Reuse)
if err != nil {
Expand Down
14 changes: 8 additions & 6 deletions cmd/goflow/goflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/cloudflare/goflow/v3/utils"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"

_ "net/http/pprof"
)

var (
Expand All @@ -22,17 +24,17 @@ var (
SFlowEnable = flag.Bool("sflow", true, "Enable sFlow")
SFlowAddr = flag.String("sflow.addr", "", "sFlow listening address")
SFlowPort = flag.Int("sflow.port", 6343, "sFlow listening port")
SFlowReuse = flag.Bool("sflow.reuserport", false, "Enable so_reuseport for sFlow")
SFlowReuse = flag.Int("sflow.reuse", 0, "Enable so_reuseport for sFlow")

NFLEnable = flag.Bool("nfl", true, "Enable NetFlow v5")
NFLAddr = flag.String("nfl.addr", "", "NetFlow v5 listening address")
NFLPort = flag.Int("nfl.port", 2056, "NetFlow v5 listening port")
NFLReuse = flag.Bool("nfl.reuserport", false, "Enable so_reuseport for NetFlow v5")
NFLReuse = flag.Int("nfl.reuse", 0, "Enable so_reuseport for NetFlow v5")

NFEnable = flag.Bool("nf", true, "Enable NetFlow/IPFIX")
NFAddr = flag.String("nf.addr", "", "NetFlow/IPFIX listening address")
NFPort = flag.Int("nf.port", 2055, "NetFlow/IPFIX listening port")
NFReuse = flag.Bool("nf.reuserport", false, "Enable so_reuseport for NetFlow/IPFIX")
NFReuse = flag.Int("nf.reuse", 0, "Enable so_reuseport for NetFlow/IPFIX")

Workers = flag.Int("workers", 1, "Number of workers per collector")
LogLevel = flag.String("loglevel", "info", "Log level")
Expand Down Expand Up @@ -115,7 +117,7 @@ func main() {
go func() {
log.WithFields(log.Fields{
"Type": "sFlow"}).
Infof("Listening on UDP %v:%v", *SFlowAddr, *SFlowPort)
Infof("Listening on UDP %s:%d (reuse: %d, workers: %d)", *SFlowAddr, *SFlowPort, *SFlowReuse, *Workers)

err := sSFlow.FlowRoutine(*Workers, *SFlowAddr, *SFlowPort, *SFlowReuse)
if err != nil {
Expand All @@ -129,7 +131,7 @@ func main() {
go func() {
log.WithFields(log.Fields{
"Type": "NetFlow"}).
Infof("Listening on UDP %v:%v", *NFAddr, *NFPort)
Infof("Listening on UDP %s:%d (reuse: %d, workers: %d)", *NFAddr, *NFPort, *NFReuse, *Workers)

err := sNF.FlowRoutine(*Workers, *NFAddr, *NFPort, *NFReuse)
if err != nil {
Expand All @@ -143,7 +145,7 @@ func main() {
go func() {
log.WithFields(log.Fields{
"Type": "NetFlowLegacy"}).
Infof("Listening on UDP %v:%v", *NFLAddr, *NFLPort)
Infof("Listening on UDP %s:%d (reuse: %d, workers: %d)", *NFLAddr, *NFLPort, *NFLReuse, *Workers)

err := sNFL.FlowRoutine(*Workers, *NFLAddr, *NFLPort, *NFLReuse)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions utils/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,22 @@ var (
Name: "flow_traffic_bytes",
Help: "Bytes received by the application.",
},
[]string{"remote_ip", "remote_port", "local_ip", "local_port", "type"},
[]string{"remote_ip", "remote_port", "local_ip", "local_port", "type", "lane"},
)
MetricTrafficPackets = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "flow_traffic_packets",
Help: "Packets received by the application.",
},
[]string{"remote_ip", "remote_port", "local_ip", "local_port", "type"},
[]string{"remote_ip", "remote_port", "local_ip", "local_port", "type", "lane"},
)
MetricPacketSizeSum = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "flow_traffic_summary_size_bytes",
Help: "Summary of packet size.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
[]string{"remote_ip", "remote_port", "local_ip", "local_port", "type"},
[]string{"remote_ip", "remote_port", "local_ip", "local_port", "type", "lane"},
)
DecoderStats = prometheus.NewCounterVec(
prometheus.CounterOpts{
Expand Down
10 changes: 5 additions & 5 deletions utils/netflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (s *StateNetFlow) DecodeFlow(msg interface{}) error {
samplerAddress = samplerAddress.To4()
}

s.templateslock.RLock()
s.templateslock.Lock()
templates, ok := s.templates[key]
if !ok {
templates = &TemplateSystem{
Expand All @@ -77,14 +77,14 @@ func (s *StateNetFlow) DecodeFlow(msg interface{}) error {
}
s.templates[key] = templates
}
s.templateslock.RUnlock()
s.samplinglock.RLock()
s.templateslock.Unlock()
s.samplinglock.Lock()
sampling, ok := s.sampling[key]
if !ok {
sampling = producer.CreateSamplingSystem()
s.sampling[key] = sampling
}
s.samplinglock.RUnlock()
s.samplinglock.Unlock()

ts := uint64(time.Now().UTC().Unix())
if pkt.SetTime {
Expand Down Expand Up @@ -349,7 +349,7 @@ func (s *StateNetFlow) InitTemplates() {
s.samplinglock = &sync.RWMutex{}
}

func (s *StateNetFlow) FlowRoutine(workers int, addr string, port int, reuseport bool) error {
func (s *StateNetFlow) FlowRoutine(workers int, addr string, port int, reuseport int) error {
s.InitTemplates()
return UDPRoutine("NetFlow", s.DecodeFlow, workers, addr, port, reuseport, s.Logger)
}
2 changes: 1 addition & 1 deletion utils/nflegacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,6 @@ func (s *StateNFLegacy) DecodeFlow(msg interface{}) error {
return nil
}

func (s *StateNFLegacy) FlowRoutine(workers int, addr string, port int, reuseport bool) error {
func (s *StateNFLegacy) FlowRoutine(workers int, addr string, port int, reuseport int) error {
return UDPRoutine("NetFlowV5", s.DecodeFlow, workers, addr, port, reuseport, s.Logger)
}
2 changes: 1 addition & 1 deletion utils/sflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,6 @@ func (s *StateSFlow) DecodeFlow(msg interface{}) error {
return nil
}

func (s *StateSFlow) FlowRoutine(workers int, addr string, port int, reuseport bool) error {
func (s *StateSFlow) FlowRoutine(workers int, addr string, port int, reuseport int) error {
return UDPRoutine("sFlow", s.DecodeFlow, workers, addr, port, reuseport, s.Logger)
}
135 changes: 77 additions & 58 deletions utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"net"
"strconv"
"sync"
"time"

decoder "github.com/cloudflare/goflow/v3/decoders"
Expand Down Expand Up @@ -146,7 +147,7 @@ func FlowMessageToJSON(fmsg *flowmessage.FlowMessage) string {
return s
}

func UDPRoutine(name string, decodeFunc decoder.DecoderFunc, workers int, addr string, port int, sockReuse bool, logger Logger) error {
func UDPRoutine(name string, decodeFunc decoder.DecoderFunc, workers int, addr string, port int, sockReuse int, logger Logger) error {
ecb := DefaultErrorCallback{
Logger: logger,
}
Expand All @@ -165,73 +166,91 @@ func UDPRoutine(name string, decodeFunc decoder.DecoderFunc, workers int, addr s
Port: port,
}

var udpconn *net.UDPConn
var err error

if sockReuse {
pconn, err := reuseport.ListenPacket("udp", addrUDP.String())
defer pconn.Close()
if err != nil {
return err
}
var ok bool
udpconn, ok = pconn.(*net.UDPConn)
if !ok {
return err
udpconnL := make([]*net.UDPConn, 0)
if sockReuse > 0 {
for i := 0; i < sockReuse; i++ {
pconn, err := reuseport.ListenPacket("udp", addrUDP.String())
defer pconn.Close()
if err != nil {
return err
}
udpconn, ok := pconn.(*net.UDPConn)
if !ok {
return err
}
udpconnL = append(udpconnL, udpconn)
}
} else {
udpconn, err = net.ListenUDP("udp", &addrUDP)
defer udpconn.Close()
udpconn, err := net.ListenUDP("udp", &addrUDP)
if err != nil {
return err
}
udpconnL = append(udpconnL, udpconn)
}

payload := make([]byte, 9000)
routine := func(lane int, udpconn *net.UDPConn) {
localIP := addrUDP.IP.String()
if addrUDP.IP == nil {
localIP = ""
}

localIP := addrUDP.IP.String()
if addrUDP.IP == nil {
localIP = ""
}
for {
payload := make([]byte, 9000)

for {
size, pktAddr, _ := udpconn.ReadFromUDP(payload)
payloadCut := make([]byte, size)
copy(payloadCut, payload[0:size])
size, pktAddr, _ := udpconn.ReadFromUDP(payload)
payloadCut := payload[0:size]
baseMessage := BaseMessage{
Src: pktAddr.IP,
Port: pktAddr.Port,
Payload: payloadCut,
}
processor.ProcessMessage(baseMessage)

baseMessage := BaseMessage{
Src: pktAddr.IP,
Port: pktAddr.Port,
Payload: payloadCut,
MetricTrafficBytes.With(
prometheus.Labels{
"remote_ip": pktAddr.IP.String(),
"remote_port": strconv.Itoa(pktAddr.Port),
"local_ip": localIP,
"local_port": strconv.Itoa(addrUDP.Port),
"type": name,
"lane": strconv.Itoa(lane),
}).
Add(float64(size))
MetricTrafficPackets.With(
prometheus.Labels{
"remote_ip": pktAddr.IP.String(),
"remote_port": strconv.Itoa(pktAddr.Port),
"local_ip": localIP,
"local_port": strconv.Itoa(addrUDP.Port),
"type": name,
"lane": strconv.Itoa(lane),
}).
Inc()
MetricPacketSizeSum.With(
prometheus.Labels{
"remote_ip": pktAddr.IP.String(),
"remote_port": strconv.Itoa(pktAddr.Port),
"local_ip": localIP,
"local_port": strconv.Itoa(addrUDP.Port),
"type": name,
"lane": strconv.Itoa(lane),
}).
Observe(float64(size))
}
processor.ProcessMessage(baseMessage)

MetricTrafficBytes.With(
prometheus.Labels{
"remote_ip": pktAddr.IP.String(),
"remote_port": strconv.Itoa(pktAddr.Port),
"local_ip": localIP,
"local_port": strconv.Itoa(addrUDP.Port),
"type": name,
}).
Add(float64(size))
MetricTrafficPackets.With(
prometheus.Labels{
"remote_ip": pktAddr.IP.String(),
"remote_port": strconv.Itoa(pktAddr.Port),
"local_ip": localIP,
"local_port": strconv.Itoa(addrUDP.Port),
"type": name,
}).
Inc()
MetricPacketSizeSum.With(
prometheus.Labels{
"remote_ip": pktAddr.IP.String(),
"remote_port": strconv.Itoa(pktAddr.Port),
"local_ip": localIP,
"local_port": strconv.Itoa(addrUDP.Port),
"type": name,
}).
Observe(float64(size))
}

wg := &sync.WaitGroup{}
for i := range udpconnL {
wg.Add(1)
lane := i
udpconn := udpconnL[i]
go func(lane int, udpconn *net.UDPConn) {
defer wg.Done()
routine(lane, udpconn)
udpconn.Close()
}(lane, udpconn)
}

wg.Wait()
return nil
}