From fe3b8f2aa1826d85d78ee4d17502ae03e27afb26 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Fri, 18 Sep 2015 07:41:33 -0700 Subject: [PATCH] BufSize option, bump for go1.5.1, bump version --- Dockerfile | 3 +-- gnatsd.go | 4 ++++ server/client.go | 11 +++++------ server/client_test.go | 2 +- server/const.go | 6 +++++- server/opts.go | 4 ++++ server/opts_test.go | 1 + server/server.go | 1 + test/test.go | 2 +- 9 files changed, 23 insertions(+), 11 deletions(-) diff --git a/Dockerfile b/Dockerfile index f0114d3614b..ccf55e04f4e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.5 +FROM golang:1.5.1 MAINTAINER Derek Collison @@ -10,4 +10,3 @@ RUN CGO_ENABLED=0 go install -v -a -tags netgo -installsuffix netgo -ldflags "-s EXPOSE 4222 8222 ENTRYPOINT ["gnatsd"] CMD ["--help"] - diff --git a/gnatsd.go b/gnatsd.go index 246f0016605..5cbee58f2d3 100644 --- a/gnatsd.go +++ b/gnatsd.go @@ -53,6 +53,10 @@ func main() { flag.IntVar(&opts.ProfPort, "profile", 0, "Profiling HTTP port") flag.StringVar(&opts.RoutesStr, "routes", "", "Routes to actively solicit a connection.") + // Not public per se, will be replaced with dynamic system, but can be used to lower memory footprint when + // lots of connections present. + flag.IntVar(&opts.BufSize, "bs", 0, "Read/Write buffer size per client connection.") + flag.Usage = server.Usage flag.Parse() diff --git a/server/client.go b/server/client.go index 169f744855a..10a20d45cac 100644 --- a/server/client.go +++ b/server/client.go @@ -17,8 +17,6 @@ import ( ) const ( - // The size of the bufio reader/writer on top of the socket. - defaultBufSize = 32768 // Scratch buffer size for the processMsg() calls. msgScratchSize = 512 msgHeadProto = "MSG " @@ -94,7 +92,7 @@ func init() { func (c *client) initClient() { s := c.srv c.cid = atomic.AddUint64(&s.gcid, 1) - c.bw = bufio.NewWriterSize(c.nc, defaultBufSize) + c.bw = bufio.NewWriterSize(c.nc, s.opts.BufSize) c.subs = hashmap.New() // This is a scratch buffer used for processMsg() @@ -123,8 +121,8 @@ func (c *client) initClient() { // No clue why, but this stalls and kills performance on Mac (Mavericks). // // if ip, ok := c.nc.(*net.TCPConn); ok { - // ip.SetReadBuffer(defaultBufSize) - // ip.SetWriteBuffer(2 * defaultBufSize) + // ip.SetReadBuffer(s.opts.BufSize) + // ip.SetWriteBuffer(2 * s.opts.BufSize) // } // Set the Ping timer @@ -139,13 +137,14 @@ func (c *client) readLoop() { // We check for that after the loop, but want to avoid a nil dereference c.mu.Lock() nc := c.nc + s := c.srv c.mu.Unlock() if nc == nil { return } - b := make([]byte, defaultBufSize) + b := make([]byte, s.opts.BufSize) for { n, err := nc.Read(b) diff --git a/server/client_test.go b/server/client_test.go index 46b95413535..4a787f1954a 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -46,7 +46,7 @@ var defaultServerOptions = Options{ func rawSetup(serverOptions Options) (*Server, *client, *bufio.Reader, string) { cli, srv := net.Pipe() - cr := bufio.NewReaderSize(cli, defaultBufSize) + cr := bufio.NewReaderSize(cli, DEFAULT_BUF_SIZE) s := New(&serverOptions) if serverOptions.Authorization != "" { s.SetAuthMethod(&mockAuth{}) diff --git a/server/const.go b/server/const.go index 5e0c11a2ff7..46c5e668aab 100644 --- a/server/const.go +++ b/server/const.go @@ -8,7 +8,7 @@ import ( const ( // VERSION is the current version for the server. - VERSION = "0.6.6" + VERSION = "0.6.8" // DEFAULT_PORT is the deault port for client connections. DEFAULT_PORT = 4222 @@ -82,4 +82,8 @@ const ( // MAX_PUB_ARGS Maximum possible number of arguments from PUB proto. MAX_PUB_ARGS = 3 + + // Default Buffer size for reads and writes per connection. Will be replaced by dynamic + // system in the long run. + DEFAULT_BUF_SIZE = 32768 ) diff --git a/server/opts.go b/server/opts.go index aabf1965b01..c2865ad70ec 100644 --- a/server/opts.go +++ b/server/opts.go @@ -47,6 +47,7 @@ type Options struct { RemoteSyslog string `json:"-"` Routes []*url.URL `json:"-"` RoutesStr string `json:"-"` + BufSize int `json:"-"` } type authorization struct { @@ -366,4 +367,7 @@ func processOptions(opts *Options) { if opts.MaxPending == 0 { opts.MaxPending = MAX_PENDING_SIZE } + if opts.BufSize == 0 { + opts.BufSize = DEFAULT_BUF_SIZE + } } diff --git a/server/opts_test.go b/server/opts_test.go index 0ebcbeac135..821dc51a20f 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -22,6 +22,7 @@ func TestDefaultOptions(t *testing.T) { MaxPayload: MAX_PAYLOAD_SIZE, MaxPending: MAX_PENDING_SIZE, ClusterAuthTimeout: float64(AUTH_TIMEOUT) / float64(time.Second), + BufSize: DEFAULT_BUF_SIZE, } opts := &Options{} diff --git a/server/server.go b/server/server.go index e4c4b27336a..82e50e8db5e 100644 --- a/server/server.go +++ b/server/server.go @@ -278,6 +278,7 @@ func (s *Server) AcceptLoop() { Noticef("Listening for client connections on %s", hp) l, e := net.Listen("tcp", hp) if e != nil { + fmt.Printf("could not listen on port for %s, %v\n", hp, e) Fatalf("Error listening on port: %s, %q", hp, e) return } diff --git a/test/test.go b/test/test.go index e3a6ba59f71..59a6857c681 100644 --- a/test/test.go +++ b/test/test.go @@ -78,7 +78,7 @@ func RunServerWithAuth(opts *server.Options, auth server.Auth) *server.Server { for time.Now().Before(end) { addr := s.Addr() if addr == nil { - time.Sleep(10 * time.Millisecond) + time.Sleep(50 * time.Millisecond) // Retry. We might take a little while to open a connection. continue }