diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..580685e --- /dev/null +++ b/Dockerfile @@ -0,0 +1,19 @@ +FROM golang:1.12.1-alpine AS builder + +RUN apk add --no-cache git tzdata + +WORKDIR /go/src/statsrelay + +ADD . . + +RUN go get +RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build + +FROM alpine:latest +WORKDIR /root + +# Copy our static executable. +COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ +COPY --from=builder /go/src/statsrelay/statsrelay . + +ENTRYPOINT ["./statsrelay"] \ No newline at end of file diff --git a/statsrelay.go b/statsrelay.go index 17c0895..cf245b4 100644 --- a/statsrelay.go +++ b/statsrelay.go @@ -6,6 +6,7 @@ import ( "flag" "fmt" "github.com/jpillora/backoff" + "github.com/patrickmn/go-cache" "io/ioutil" "log" "net" @@ -19,10 +20,9 @@ import ( "sync" "syscall" "time" - "github.com/patrickmn/go-cache" ) -const VERSION string = "0.0.9" +const VERSION string = "0.1.0" // BUFFERSIZE controls the size of the [...]byte array used to read UDP data // off the wire and into local memory. Metrics are separated by \n @@ -47,8 +47,8 @@ var metricsPrefix string // Default is empty var metricTags string -// udpAddr is a mapping of HOST:PORT:INSTANCE to a UDPAddr object -var udpAddr = make(map[string]*net.UDPAddr) +// udpSourceAddr is a mapping of HOST:PORT:INSTANCE to a UDPAddr object used when udp single source is enabled +var udpSourceAddr = make(map[string]*net.UDPAddr) // tcpAddr is a mapping of HOST:PORT:INSTANCE to a TCPAddr object var tcpAddr = make(map[string]*net.TCPAddr) @@ -121,6 +121,15 @@ var c = cache.New(dnscacheTime, dnscachePurge) // ctarget cached target used for resolving var ctarget string +func getUdpLocalAddr() *net.UDPAddr { + conn, err := net.ListenUDP("udp", nil) + if err != nil { + log.Fatal(err) + } + conn.Close() + return conn.LocalAddr().(*net.UDPAddr) +} + // sockBufferMaxSize() returns the maximum size that the UDP receive buffer // in the kernel can be set to. In bytes. func getSockBufferMaxSize() (int, error) { @@ -191,21 +200,55 @@ func genTags(metric, metricTags string) string { return fmt.Sprintf("%s|#%s", metric, metricTags) } +func getUDPDestnAddr(target string) *net.UDPAddr { + if !dnscache { + targetaddr, err := net.ResolveUDPAddr("udp", target) + if err != nil { + log.Printf("Error resolving target %s", target) + return nil + } + return targetaddr + } + + gettarget, found := c.Get(target) + if found { + if verbose { + log.Printf("Found in cache target %s (%s)", target, gettarget) + } + return gettarget.(*net.UDPAddr) + } else { + targetaddr, err := net.ResolveUDPAddr("udp", target) + if verbose { + log.Printf("Not found in cache adding target %s (%s)", target, targetaddr) + } + if err != nil { + log.Printf("Error resolving target %s", target) + return nil + } + c.Set(target, targetaddr, dnscacheExp) + return targetaddr + } +} + // sendPacket takes a []byte and writes that directly to a UDP socket // that was assigned for target. func sendPacket(buff []byte, target string, sendproto string, TCPtimeout time.Duration, boff *backoff.Backoff) { - switch sendproto { - case "UDP": - conn, err := net.ListenUDP("udp", nil) + switch sendproto { + case "UDP": + conn, err := net.ListenUDP("udp", udpSourceAddr[target]) if err != nil { - log.Panicln(err) + conn, err = net.ListenUDP("udp", nil) + if err != nil { + log.Printf("error in connection %s\n", err) + return + } } - conn.WriteToUDP(buff, udpAddr[target]) + conn.WriteToUDP(buff, getUDPDestnAddr(target)) conn.Close() - case "TCP": - if verbose { - log.Printf("Sending to target: %s", target) - } + case "TCP": + if verbose { + log.Printf("Sending to target: %s", target) + } for i := 0; i < TCPMaxRetries; i++ { conn, err := net.DialTimeout("tcp", target, TCPtimeout) if err != nil { @@ -292,29 +335,9 @@ func handleBuff(buff []byte) { if err == nil { - target := hashRing.GetNode(metric).Server - ctarget := target - - // resolve and cache - if dnscache { - gettarget, found := c.Get(target) - if found { - ctarget = gettarget.(string) - if verbose { - log.Printf("Found in cache target %s (%s)", target, ctarget) - } - } else { - targetaddr, err := net.ResolveUDPAddr("udp", target) - if verbose { - log.Printf("Not found in cache adding target %s (%s)", target, ctarget) - } - if err != nil { - log.Printf("Error resolving target %s", target) - } - c.Set(target, targetaddr.String(), dnscacheExp) - ctarget = targetaddr.String() - } - } + target := hashRing.GetNode(metric).Server + ctarget := target + // check built packet size and send if metric doesn't fit if packets[target].Len()+size > packetLen { sendPacket(packets[target].Bytes(), ctarget, sendproto, TCPtimeout, boff) @@ -448,7 +471,8 @@ func readUDP(ip string, port int, c chan []byte) { timeout = true err = sock.SetDeadline(time.Now().Add(time.Second)) if err != nil { - log.Panicln(err) + log.Printf("Set deadline Error: %s\n", err) + continue } } else { log.Printf("Read Error: %s\n", err) @@ -493,6 +517,7 @@ func runServer(host string, port int) { func main() { var bindAddress string var port int + var udpSingleSource bool flag.IntVar(&port, "port", 9125, "Port to listen on") flag.IntVar(&port, "p", 9125, "Port to listen on") @@ -510,10 +535,10 @@ func main() { flag.BoolVar(&verbose, "verbose", false, "Verbose output") flag.BoolVar(&verbose, "v", false, "Verbose output") - flag.BoolVar(&dnscache, "dnscache", false, "Enable in app DNS cache for resolved TCP sendout sharded endpoints") - flag.DurationVar(&dnscacheTime, "dnscache-time", 1*time.Second, "Time we cache resolved adresses of sharded endpoint") - flag.DurationVar(&dnscachePurge, "dnscache-purge", 5*time.Second, "When we purge stale elements in cache") - flag.DurationVar(&dnscacheExp, "dnscache-expiration", 1*time.Second, "When set new object after resolv then use this expiration time in cache") + flag.BoolVar(&dnscache, "dnscache", false, "Enable in app DNS cache for resolving sharded endpoints") + flag.DurationVar(&dnscacheTime, "dnscache-time", 30*time.Second, "Time we cache resolved adresses of sharded endpoint") + flag.DurationVar(&dnscachePurge, "dnscache-purge", 60*time.Second, "When we purge stale elements in cache") + flag.DurationVar(&dnscacheExp, "dnscache-expiration", 30*time.Second, "When set new object after resolv then use this expiration time in cache") flag.StringVar(&sendproto, "sendproto", "UDP", "IP Protocol for sending data: TCP, UDP, or TEST") flag.IntVar(&packetLen, "packetlen", 1400, "Max packet length. Must be lower than MTU plus IPv4 and UDP headers to avoid fragmentation.") @@ -528,7 +553,7 @@ func main() { flag.DurationVar(&TCPMinBackoff, "backoff-min", 50*time.Millisecond, "Backoff minimal (integer) time in Millisecond") flag.DurationVar(&TCPMaxBackoff, "backoff-max", 1000*time.Millisecond, "Backoff maximal (integer) time in Millisecond") flag.Float64Var(&TCPFactorBackoff, "backoff-factor", 1.5, "Backoff factor (float)") - + flag.BoolVar(&udpSingleSource, "udp-single-source", false, "To use a single source for outbound udp traffic") defaultBufferSize, err := getSockBufferMaxSize() if err != nil { @@ -554,6 +579,9 @@ func main() { }() } + // udpAddr is a mapping of HOST:PORT:INSTANCE to a UDPAddr object + udpAddr := make(map[string]*net.UDPAddr) + for _, v := range flag.Args() { var addr *net.UDPAddr var err error @@ -584,6 +612,11 @@ func main() { hashRing.AddNode(Node{v, ""}) } } + if udpSingleSource { + for k := range udpAddr { + udpSourceAddr[k] = getUdpLocalAddr() + } + } epochTime = time.Now().Unix() runServer(bindAddress, port)