diff --git a/cmd/cnetflow/cnetflow.go b/cmd/cnetflow/cnetflow.go index 1eae6de3..1c7c739e 100644 --- a/cmd/cnetflow/cnetflow.go +++ b/cmd/cnetflow/cnetflow.go @@ -20,7 +20,7 @@ var ( Addr = flag.String("addr", "", "NetFlow/IPFIX listening address") Port = flag.Int("port", 2055, "NetFlow/IPFIX listening port") - Reuse = flag.Bool("reuse", false, "Enable so_reuseport for NetFlow/IPFIX listening port") + Reuse = flag.Int("reuse", 0, "Enable so_reuseport for NetFlow/IPFIX listening port") Workers = flag.Int("workers", 1, "Number of NetFlow workers") LogLevel = flag.String("loglevel", "info", "Log level") @@ -86,7 +86,7 @@ func main() { } log.WithFields(log.Fields{ "Type": "NetFlow"}). - Infof("Listening on UDP %v:%v", *Addr, *Port) + Infof("Listening on UDP %s:%d (reuse: %d, workers: %d)", *Addr, *Port, *Reuse, *Workers) err := s.FlowRoutine(*Workers, *Addr, *Port, *Reuse) if err != nil { diff --git a/cmd/cnflegacy/cnflegacy.go b/cmd/cnflegacy/cnflegacy.go index 30c80780..8dada205 100644 --- a/cmd/cnflegacy/cnflegacy.go +++ b/cmd/cnflegacy/cnflegacy.go @@ -20,7 +20,7 @@ var ( Addr = flag.String("addr", "", "NetFlow v5 listening address") Port = flag.Int("port", 2055, "NetFlow v5 listening port") - Reuse = flag.Bool("reuse", false, "Enable so_reuseport for NetFlow v5 listening port") + Reuse = flag.Int("reuse", 0, "Enable so_reuseport for NetFlow v5 listening port") Workers = flag.Int("workers", 1, "Number of NetFlow v5 workers") LogLevel = flag.String("loglevel", "info", "Log level") @@ -84,7 +84,7 @@ func main() { } log.WithFields(log.Fields{ "Type": "NetFlowLegacy"}). - Infof("Listening on UDP %v:%v", *Addr, *Port) + Infof("Listening on UDP %s:%d (reuse: %d, workers: %d)", *Addr, *Port, *Reuse, *Workers) err := s.FlowRoutine(*Workers, *Addr, *Port, *Reuse) if err != nil { diff --git a/cmd/csflow/csflow.go b/cmd/csflow/csflow.go index c326fe0e..e0847abd 100644 --- a/cmd/csflow/csflow.go +++ b/cmd/csflow/csflow.go @@ -20,7 +20,7 @@ var ( Addr = flag.String("addr", "", "sFlow listening address") Port = flag.Int("port", 6343, "sFlow listening port") - Reuse = flag.Bool("reuse", false, "Enable so_reuseport for sFlow listening port") + Reuse = flag.Int("reuse", 0, "Enable so_reuseport for sFlow listening port") Workers = flag.Int("workers", 1, "Number of sFlow workers") LogLevel = flag.String("loglevel", "info", "Log level") @@ -84,7 +84,7 @@ func main() { } log.WithFields(log.Fields{ "Type": "sFlow"}). - Infof("Listening on UDP %v:%v", *Addr, *Port) + Infof("Listening on UDP %s:%d (reuse: %d, workers: %d)", *Addr, *Port, *Reuse, *Workers) err := s.FlowRoutine(*Workers, *Addr, *Port, *Reuse) if err != nil { diff --git a/cmd/goflow/goflow.go b/cmd/goflow/goflow.go index 4221d1fa..481da6ac 100644 --- a/cmd/goflow/goflow.go +++ b/cmd/goflow/goflow.go @@ -12,6 +12,8 @@ import ( "github.com/cloudflare/goflow/v3/utils" "github.com/prometheus/client_golang/prometheus/promhttp" log "github.com/sirupsen/logrus" + + _ "net/http/pprof" ) var ( @@ -22,17 +24,17 @@ var ( SFlowEnable = flag.Bool("sflow", true, "Enable sFlow") SFlowAddr = flag.String("sflow.addr", "", "sFlow listening address") SFlowPort = flag.Int("sflow.port", 6343, "sFlow listening port") - SFlowReuse = flag.Bool("sflow.reuserport", false, "Enable so_reuseport for sFlow") + SFlowReuse = flag.Int("sflow.reuse", 0, "Enable so_reuseport for sFlow") NFLEnable = flag.Bool("nfl", true, "Enable NetFlow v5") NFLAddr = flag.String("nfl.addr", "", "NetFlow v5 listening address") NFLPort = flag.Int("nfl.port", 2056, "NetFlow v5 listening port") - NFLReuse = flag.Bool("nfl.reuserport", false, "Enable so_reuseport for NetFlow v5") + NFLReuse = flag.Int("nfl.reuse", 0, "Enable so_reuseport for NetFlow v5") NFEnable = flag.Bool("nf", true, "Enable NetFlow/IPFIX") NFAddr = flag.String("nf.addr", "", "NetFlow/IPFIX listening address") NFPort = flag.Int("nf.port", 2055, "NetFlow/IPFIX listening port") - NFReuse = flag.Bool("nf.reuserport", false, "Enable so_reuseport for NetFlow/IPFIX") + NFReuse = flag.Int("nf.reuse", 0, "Enable so_reuseport for NetFlow/IPFIX") Workers = flag.Int("workers", 1, "Number of workers per collector") LogLevel = flag.String("loglevel", "info", "Log level") @@ -115,7 +117,7 @@ func main() { go func() { log.WithFields(log.Fields{ "Type": "sFlow"}). - Infof("Listening on UDP %v:%v", *SFlowAddr, *SFlowPort) + Infof("Listening on UDP %s:%d (reuse: %d, workers: %d)", *SFlowAddr, *SFlowPort, *SFlowReuse, *Workers) err := sSFlow.FlowRoutine(*Workers, *SFlowAddr, *SFlowPort, *SFlowReuse) if err != nil { @@ -129,7 +131,7 @@ func main() { go func() { log.WithFields(log.Fields{ "Type": "NetFlow"}). - Infof("Listening on UDP %v:%v", *NFAddr, *NFPort) + Infof("Listening on UDP %s:%d (reuse: %d, workers: %d)", *NFAddr, *NFPort, *NFReuse, *Workers) err := sNF.FlowRoutine(*Workers, *NFAddr, *NFPort, *NFReuse) if err != nil { @@ -143,7 +145,7 @@ func main() { go func() { log.WithFields(log.Fields{ "Type": "NetFlowLegacy"}). - Infof("Listening on UDP %v:%v", *NFLAddr, *NFLPort) + Infof("Listening on UDP %s:%d (reuse: %d, workers: %d)", *NFLAddr, *NFLPort, *NFLReuse, *Workers) err := sNFL.FlowRoutine(*Workers, *NFLAddr, *NFLPort, *NFLReuse) if err != nil { diff --git a/utils/metrics.go b/utils/metrics.go index 585923c1..e2390a7e 100644 --- a/utils/metrics.go +++ b/utils/metrics.go @@ -13,14 +13,14 @@ var ( Name: "flow_traffic_bytes", Help: "Bytes received by the application.", }, - []string{"remote_ip", "remote_port", "local_ip", "local_port", "type"}, + []string{"remote_ip", "remote_port", "local_ip", "local_port", "type", "lane"}, ) MetricTrafficPackets = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "flow_traffic_packets", Help: "Packets received by the application.", }, - []string{"remote_ip", "remote_port", "local_ip", "local_port", "type"}, + []string{"remote_ip", "remote_port", "local_ip", "local_port", "type", "lane"}, ) MetricPacketSizeSum = prometheus.NewSummaryVec( prometheus.SummaryOpts{ @@ -28,7 +28,7 @@ var ( Help: "Summary of packet size.", Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, }, - []string{"remote_ip", "remote_port", "local_ip", "local_port", "type"}, + []string{"remote_ip", "remote_port", "local_ip", "local_port", "type", "lane"}, ) DecoderStats = prometheus.NewCounterVec( prometheus.CounterOpts{ diff --git a/utils/netflow.go b/utils/netflow.go index 6fe2935d..81055fb9 100644 --- a/utils/netflow.go +++ b/utils/netflow.go @@ -68,7 +68,7 @@ func (s *StateNetFlow) DecodeFlow(msg interface{}) error { samplerAddress = samplerAddress.To4() } - s.templateslock.RLock() + s.templateslock.Lock() templates, ok := s.templates[key] if !ok { templates = &TemplateSystem{ @@ -77,14 +77,14 @@ func (s *StateNetFlow) DecodeFlow(msg interface{}) error { } s.templates[key] = templates } - s.templateslock.RUnlock() - s.samplinglock.RLock() + s.templateslock.Unlock() + s.samplinglock.Lock() sampling, ok := s.sampling[key] if !ok { sampling = producer.CreateSamplingSystem() s.sampling[key] = sampling } - s.samplinglock.RUnlock() + s.samplinglock.Unlock() ts := uint64(time.Now().UTC().Unix()) if pkt.SetTime { @@ -349,7 +349,7 @@ func (s *StateNetFlow) InitTemplates() { s.samplinglock = &sync.RWMutex{} } -func (s *StateNetFlow) FlowRoutine(workers int, addr string, port int, reuseport bool) error { +func (s *StateNetFlow) FlowRoutine(workers int, addr string, port int, reuseport int) error { s.InitTemplates() return UDPRoutine("NetFlow", s.DecodeFlow, workers, addr, port, reuseport, s.Logger) } diff --git a/utils/nflegacy.go b/utils/nflegacy.go index 124216c4..91041467 100644 --- a/utils/nflegacy.go +++ b/utils/nflegacy.go @@ -84,6 +84,6 @@ func (s *StateNFLegacy) DecodeFlow(msg interface{}) error { return nil } -func (s *StateNFLegacy) FlowRoutine(workers int, addr string, port int, reuseport bool) error { +func (s *StateNFLegacy) FlowRoutine(workers int, addr string, port int, reuseport int) error { return UDPRoutine("NetFlowV5", s.DecodeFlow, workers, addr, port, reuseport, s.Logger) } diff --git a/utils/sflow.go b/utils/sflow.go index ef62805b..7638adaf 100644 --- a/utils/sflow.go +++ b/utils/sflow.go @@ -137,6 +137,6 @@ func (s *StateSFlow) DecodeFlow(msg interface{}) error { return nil } -func (s *StateSFlow) FlowRoutine(workers int, addr string, port int, reuseport bool) error { +func (s *StateSFlow) FlowRoutine(workers int, addr string, port int, reuseport int) error { return UDPRoutine("sFlow", s.DecodeFlow, workers, addr, port, reuseport, s.Logger) } diff --git a/utils/utils.go b/utils/utils.go index 5be25929..a2dc5698 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -6,6 +6,7 @@ import ( "fmt" "net" "strconv" + "sync" "time" decoder "github.com/cloudflare/goflow/v3/decoders" @@ -146,7 +147,7 @@ func FlowMessageToJSON(fmsg *flowmessage.FlowMessage) string { return s } -func UDPRoutine(name string, decodeFunc decoder.DecoderFunc, workers int, addr string, port int, sockReuse bool, logger Logger) error { +func UDPRoutine(name string, decodeFunc decoder.DecoderFunc, workers int, addr string, port int, sockReuse int, logger Logger) error { ecb := DefaultErrorCallback{ Logger: logger, } @@ -165,73 +166,91 @@ func UDPRoutine(name string, decodeFunc decoder.DecoderFunc, workers int, addr s Port: port, } - var udpconn *net.UDPConn - var err error - - if sockReuse { - pconn, err := reuseport.ListenPacket("udp", addrUDP.String()) - defer pconn.Close() - if err != nil { - return err - } - var ok bool - udpconn, ok = pconn.(*net.UDPConn) - if !ok { - return err + udpconnL := make([]*net.UDPConn, 0) + if sockReuse > 0 { + for i := 0; i < sockReuse; i++ { + pconn, err := reuseport.ListenPacket("udp", addrUDP.String()) + defer pconn.Close() + if err != nil { + return err + } + udpconn, ok := pconn.(*net.UDPConn) + if !ok { + return err + } + udpconnL = append(udpconnL, udpconn) } } else { - udpconn, err = net.ListenUDP("udp", &addrUDP) - defer udpconn.Close() + udpconn, err := net.ListenUDP("udp", &addrUDP) if err != nil { return err } + udpconnL = append(udpconnL, udpconn) } - payload := make([]byte, 9000) + routine := func(lane int, udpconn *net.UDPConn) { + localIP := addrUDP.IP.String() + if addrUDP.IP == nil { + localIP = "" + } - localIP := addrUDP.IP.String() - if addrUDP.IP == nil { - localIP = "" - } + for { + payload := make([]byte, 9000) - for { - size, pktAddr, _ := udpconn.ReadFromUDP(payload) - payloadCut := make([]byte, size) - copy(payloadCut, payload[0:size]) + size, pktAddr, _ := udpconn.ReadFromUDP(payload) + payloadCut := payload[0:size] + baseMessage := BaseMessage{ + Src: pktAddr.IP, + Port: pktAddr.Port, + Payload: payloadCut, + } + processor.ProcessMessage(baseMessage) - baseMessage := BaseMessage{ - Src: pktAddr.IP, - Port: pktAddr.Port, - Payload: payloadCut, + MetricTrafficBytes.With( + prometheus.Labels{ + "remote_ip": pktAddr.IP.String(), + "remote_port": strconv.Itoa(pktAddr.Port), + "local_ip": localIP, + "local_port": strconv.Itoa(addrUDP.Port), + "type": name, + "lane": strconv.Itoa(lane), + }). + Add(float64(size)) + MetricTrafficPackets.With( + prometheus.Labels{ + "remote_ip": pktAddr.IP.String(), + "remote_port": strconv.Itoa(pktAddr.Port), + "local_ip": localIP, + "local_port": strconv.Itoa(addrUDP.Port), + "type": name, + "lane": strconv.Itoa(lane), + }). + Inc() + MetricPacketSizeSum.With( + prometheus.Labels{ + "remote_ip": pktAddr.IP.String(), + "remote_port": strconv.Itoa(pktAddr.Port), + "local_ip": localIP, + "local_port": strconv.Itoa(addrUDP.Port), + "type": name, + "lane": strconv.Itoa(lane), + }). + Observe(float64(size)) } - processor.ProcessMessage(baseMessage) - - MetricTrafficBytes.With( - prometheus.Labels{ - "remote_ip": pktAddr.IP.String(), - "remote_port": strconv.Itoa(pktAddr.Port), - "local_ip": localIP, - "local_port": strconv.Itoa(addrUDP.Port), - "type": name, - }). - Add(float64(size)) - MetricTrafficPackets.With( - prometheus.Labels{ - "remote_ip": pktAddr.IP.String(), - "remote_port": strconv.Itoa(pktAddr.Port), - "local_ip": localIP, - "local_port": strconv.Itoa(addrUDP.Port), - "type": name, - }). - Inc() - MetricPacketSizeSum.With( - prometheus.Labels{ - "remote_ip": pktAddr.IP.String(), - "remote_port": strconv.Itoa(pktAddr.Port), - "local_ip": localIP, - "local_port": strconv.Itoa(addrUDP.Port), - "type": name, - }). - Observe(float64(size)) } + + wg := &sync.WaitGroup{} + for i := range udpconnL { + wg.Add(1) + lane := i + udpconn := udpconnL[i] + go func(lane int, udpconn *net.UDPConn) { + defer wg.Done() + routine(lane, udpconn) + udpconn.Close() + }(lane, udpconn) + } + + wg.Wait() + return nil }