Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
FROM golang:1.12.1-alpine AS builder

RUN apk add --no-cache git tzdata

WORKDIR /go/src/statsrelay

ADD . .

RUN go get
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build

FROM alpine:latest
WORKDIR /root

# Copy our static executable.
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
COPY --from=builder /go/src/statsrelay/statsrelay .

ENTRYPOINT ["./statsrelay"]
117 changes: 75 additions & 42 deletions statsrelay.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"flag"
"fmt"
"github.com/jpillora/backoff"
"github.com/patrickmn/go-cache"
"io/ioutil"
"log"
"net"
Expand All @@ -19,10 +20,9 @@ import (
"sync"
"syscall"
"time"
"github.com/patrickmn/go-cache"
)

const VERSION string = "0.0.9"
const VERSION string = "0.1.0"

// BUFFERSIZE controls the size of the [...]byte array used to read UDP data
// off the wire and into local memory. Metrics are separated by \n
Expand All @@ -47,8 +47,8 @@ var metricsPrefix string
// Default is empty
var metricTags string

// udpAddr is a mapping of HOST:PORT:INSTANCE to a UDPAddr object
var udpAddr = make(map[string]*net.UDPAddr)
// udpSourceAddr is a mapping of HOST:PORT:INSTANCE to a UDPAddr object used when udp single source is enabled
var udpSourceAddr = make(map[string]*net.UDPAddr)

// tcpAddr is a mapping of HOST:PORT:INSTANCE to a TCPAddr object
var tcpAddr = make(map[string]*net.TCPAddr)
Expand Down Expand Up @@ -121,6 +121,15 @@ var c = cache.New(dnscacheTime, dnscachePurge)
// ctarget cached target used for resolving
var ctarget string

func getUdpLocalAddr() *net.UDPAddr {
conn, err := net.ListenUDP("udp", nil)
if err != nil {
log.Fatal(err)
}
conn.Close()
return conn.LocalAddr().(*net.UDPAddr)
}

// sockBufferMaxSize() returns the maximum size that the UDP receive buffer
// in the kernel can be set to. In bytes.
func getSockBufferMaxSize() (int, error) {
Expand Down Expand Up @@ -191,21 +200,55 @@ func genTags(metric, metricTags string) string {
return fmt.Sprintf("%s|#%s", metric, metricTags)
}

func getUDPDestnAddr(target string) *net.UDPAddr {
if !dnscache {
targetaddr, err := net.ResolveUDPAddr("udp", target)
if err != nil {
log.Printf("Error resolving target %s", target)
return nil
}
return targetaddr
}

gettarget, found := c.Get(target)
if found {
if verbose {
log.Printf("Found in cache target %s (%s)", target, gettarget)
}
return gettarget.(*net.UDPAddr)
} else {
targetaddr, err := net.ResolveUDPAddr("udp", target)
if verbose {
log.Printf("Not found in cache adding target %s (%s)", target, targetaddr)
}
if err != nil {
log.Printf("Error resolving target %s", target)
return nil
}
c.Set(target, targetaddr, dnscacheExp)
return targetaddr
}
}

// sendPacket takes a []byte and writes that directly to a UDP socket
// that was assigned for target.
func sendPacket(buff []byte, target string, sendproto string, TCPtimeout time.Duration, boff *backoff.Backoff) {
switch sendproto {
case "UDP":
conn, err := net.ListenUDP("udp", nil)
switch sendproto {
case "UDP":
conn, err := net.ListenUDP("udp", udpSourceAddr[target])
if err != nil {
log.Panicln(err)
conn, err = net.ListenUDP("udp", nil)
if err != nil {
log.Printf("error in connection %s\n", err)
return
}
}
conn.WriteToUDP(buff, udpAddr[target])
conn.WriteToUDP(buff, getUDPDestnAddr(target))
conn.Close()
case "TCP":
if verbose {
log.Printf("Sending to target: %s", target)
}
case "TCP":
if verbose {
log.Printf("Sending to target: %s", target)
}
for i := 0; i < TCPMaxRetries; i++ {
conn, err := net.DialTimeout("tcp", target, TCPtimeout)
if err != nil {
Expand Down Expand Up @@ -292,29 +335,9 @@ func handleBuff(buff []byte) {

if err == nil {

target := hashRing.GetNode(metric).Server
ctarget := target

// resolve and cache
if dnscache {
gettarget, found := c.Get(target)
if found {
ctarget = gettarget.(string)
if verbose {
log.Printf("Found in cache target %s (%s)", target, ctarget)
}
} else {
targetaddr, err := net.ResolveUDPAddr("udp", target)
if verbose {
log.Printf("Not found in cache adding target %s (%s)", target, ctarget)
}
if err != nil {
log.Printf("Error resolving target %s", target)
}
c.Set(target, targetaddr.String(), dnscacheExp)
ctarget = targetaddr.String()
}
}
target := hashRing.GetNode(metric).Server
ctarget := target

// check built packet size and send if metric doesn't fit
if packets[target].Len()+size > packetLen {
sendPacket(packets[target].Bytes(), ctarget, sendproto, TCPtimeout, boff)
Expand Down Expand Up @@ -448,7 +471,8 @@ func readUDP(ip string, port int, c chan []byte) {
timeout = true
err = sock.SetDeadline(time.Now().Add(time.Second))
if err != nil {
log.Panicln(err)
log.Printf("Set deadline Error: %s\n", err)
continue
}
} else {
log.Printf("Read Error: %s\n", err)
Expand Down Expand Up @@ -493,6 +517,7 @@ func runServer(host string, port int) {
func main() {
var bindAddress string
var port int
var udpSingleSource bool

flag.IntVar(&port, "port", 9125, "Port to listen on")
flag.IntVar(&port, "p", 9125, "Port to listen on")
Expand All @@ -510,10 +535,10 @@ func main() {
flag.BoolVar(&verbose, "verbose", false, "Verbose output")
flag.BoolVar(&verbose, "v", false, "Verbose output")

flag.BoolVar(&dnscache, "dnscache", false, "Enable in app DNS cache for resolved TCP sendout sharded endpoints")
flag.DurationVar(&dnscacheTime, "dnscache-time", 1*time.Second, "Time we cache resolved adresses of sharded endpoint")
flag.DurationVar(&dnscachePurge, "dnscache-purge", 5*time.Second, "When we purge stale elements in cache")
flag.DurationVar(&dnscacheExp, "dnscache-expiration", 1*time.Second, "When set new object after resolv then use this expiration time in cache")
flag.BoolVar(&dnscache, "dnscache", false, "Enable in app DNS cache for resolving sharded endpoints")
flag.DurationVar(&dnscacheTime, "dnscache-time", 30*time.Second, "Time we cache resolved adresses of sharded endpoint")
flag.DurationVar(&dnscachePurge, "dnscache-purge", 60*time.Second, "When we purge stale elements in cache")
flag.DurationVar(&dnscacheExp, "dnscache-expiration", 30*time.Second, "When set new object after resolv then use this expiration time in cache")

flag.StringVar(&sendproto, "sendproto", "UDP", "IP Protocol for sending data: TCP, UDP, or TEST")
flag.IntVar(&packetLen, "packetlen", 1400, "Max packet length. Must be lower than MTU plus IPv4 and UDP headers to avoid fragmentation.")
Expand All @@ -528,7 +553,7 @@ func main() {
flag.DurationVar(&TCPMinBackoff, "backoff-min", 50*time.Millisecond, "Backoff minimal (integer) time in Millisecond")
flag.DurationVar(&TCPMaxBackoff, "backoff-max", 1000*time.Millisecond, "Backoff maximal (integer) time in Millisecond")
flag.Float64Var(&TCPFactorBackoff, "backoff-factor", 1.5, "Backoff factor (float)")

flag.BoolVar(&udpSingleSource, "udp-single-source", false, "To use a single source for outbound udp traffic")

defaultBufferSize, err := getSockBufferMaxSize()
if err != nil {
Expand All @@ -554,6 +579,9 @@ func main() {
}()
}

// udpAddr is a mapping of HOST:PORT:INSTANCE to a UDPAddr object
udpAddr := make(map[string]*net.UDPAddr)

for _, v := range flag.Args() {
var addr *net.UDPAddr
var err error
Expand Down Expand Up @@ -584,6 +612,11 @@ func main() {
hashRing.AddNode(Node{v, ""})
}
}
if udpSingleSource {
for k := range udpAddr {
udpSourceAddr[k] = getUdpLocalAddr()
}
}

epochTime = time.Now().Unix()
runServer(bindAddress, port)
Expand Down