Skip to content

Commit

Permalink
Merge pull request #116 from nats-io/race
Browse files Browse the repository at this point in the history
Race Fixes for 1.5 use of GOMAXPROCS
  • Loading branch information
derekcollison committed Sep 18, 2015
2 parents 7620f4a + b30af11 commit f26e2e4
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 22 deletions.
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
language: go
go:
- 1.4
- 1.5
install:
- DST=~/gopath/src/github.com/nats-io
- mkdir -p "$DST"
Expand All @@ -13,7 +14,7 @@ script:
- ./travis/gofmt.sh
- ./travis/govet.sh
- go test -i -race ./...
- GOMAXPROCS=1 go test -v -race ./...
- go test -v -race ./...
- ./travis/coveralls-script.sh
env:
global:
Expand Down
2 changes: 0 additions & 2 deletions gnatsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,11 @@ func configureAuth(s *server.Server, opts *server.Options) {
Username: opts.Username,
Password: opts.Password,
}

s.SetAuthMethod(auth)
} else if opts.Authorization != "" {
auth := &auth.Token{
Token: opts.Authorization,
}

s.SetAuthMethod(auth)
}
}
Expand Down
7 changes: 5 additions & 2 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,9 @@ func (c *client) sendErr(err string) {
c.mu.Lock()
if c.bw != nil {
c.bw.WriteString(fmt.Sprintf("-ERR '%s'\r\n", err))
c.pcd[c] = needFlush
// Flush errors in place.
c.bw.Flush()
//c.pcd[c] = needFlush
}
c.mu.Unlock()
}
Expand All @@ -326,12 +328,13 @@ func (c *client) sendOK() {
}

func (c *client) processPing() {
c.mu.Lock()
c.traceInOp("PING", nil)
if c.nc == nil {
c.mu.Unlock()
return
}
c.traceOutOp("PONG", nil)
c.mu.Lock()
c.bw.WriteString("PONG\r\n")
err := c.bw.Flush()
if err != nil {
Expand Down
13 changes: 12 additions & 1 deletion server/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client {
r := &route{didSolicit: didSolicit}
c := &client{srv: s, nc: conn, opts: clientOpts{}, typ: ROUTER, route: r}

// Grab JSON info string
s.mu.Lock()
info := s.routeInfoJSON
s.mu.Unlock()

// Grab lock
c.mu.Lock()

Expand All @@ -104,7 +109,7 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client {
}

// Send our info to the other side.
s.sendInfo(c)
s.sendInfo(c, info)

// Check for Auth required state for incoming connections.
if s.routeInfo.AuthRequired && !didSolicit {
Expand Down Expand Up @@ -221,6 +226,9 @@ func (s *Server) broadcastToRoutes(proto string) {
// broadcastSubscribe will forward a client subscription
// to all active routes.
func (s *Server) broadcastSubscribe(sub *subscription) {
if s.numRoutes() == 0 {
return
}
rsid := routeSid(sub)
proto := fmt.Sprintf(subProto, sub.subject, sub.queue, rsid)
s.broadcastToRoutes(proto)
Expand All @@ -229,6 +237,9 @@ func (s *Server) broadcastSubscribe(sub *subscription) {
// broadcastUnSubscribe will forward a client unsubscribe
// action to all active routes.
func (s *Server) broadcastUnSubscribe(sub *subscription) {
if s.numRoutes() == 0 {
return
}
rsid := routeSid(sub)
maxStr := _EMPTY_
// Set max if we have it set and have not tripped auto-unsubscribe
Expand Down
26 changes: 15 additions & 11 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ func New(opts *Options) *Server {

// Sets the authentication method
func (s *Server) SetAuthMethod(authMethod Auth) {
s.mu.Lock()
defer s.mu.Unlock()

s.info.AuthRequired = true
s.auth = authMethod

Expand Down Expand Up @@ -385,6 +388,12 @@ func (s *Server) StartHTTPMonitoring() {
func (s *Server) createClient(conn net.Conn) *client {
c := &client{srv: s, nc: conn, opts: defaultOpts, mpay: s.info.MaxPayload}

// Grab JSON info string
s.mu.Lock()
info := s.infoJSON
authRequired := s.info.AuthRequired
s.mu.Unlock()

// Grab lock
c.mu.Lock()

Expand All @@ -393,15 +402,15 @@ func (s *Server) createClient(conn net.Conn) *client {

c.Debugf("Client connection created")

// Send our information.
s.sendInfo(c)

// Check for Auth
if s.info.AuthRequired {
if authRequired {
ttl := secondsToDuration(s.opts.AuthTimeout)
c.setAuthTimer(ttl)
}

// Send our information.
s.sendInfo(c, info)

// Unlock to register
c.mu.Unlock()

Expand All @@ -414,13 +423,8 @@ func (s *Server) createClient(conn net.Conn) *client {
}

// Assume the lock is held upon entry.
func (s *Server) sendInfo(c *client) {
switch c.typ {
case CLIENT:
c.nc.Write(s.infoJSON)
case ROUTER:
c.nc.Write(s.routeInfoJSON)
}
func (s *Server) sendInfo(c *client, info []byte) {
c.nc.Write(info)
}

func (s *Server) checkClientAuth(c *client) bool {
Expand Down
10 changes: 6 additions & 4 deletions test/pedantic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ import (

func runPedanticServer() *server.Server {
opts := DefaultTestOptions

opts.NoLog = false
opts.Trace = true

opts.Port = PROTO_TEST_PORT
return RunServer(&opts)
}
Expand All @@ -23,8 +27,7 @@ func TestPedanticSub(t *testing.T) {

send := sendCommand(t, c)
expect := expectCommand(t, c)
doConnect(t, c, true, true, false)
expect(okRe)
doConnect(t, c, false, true, false)

// Ping should still be same
send("PING\r\n")
Expand Down Expand Up @@ -69,8 +72,7 @@ func TestPedanticPub(t *testing.T) {

send := sendCommand(t, c)
expect := expectCommand(t, c)
doConnect(t, c, true, true, false)
expect(okRe)
doConnect(t, c, false, true, false)

// Ping should still be same
send("PING\r\n")
Expand Down
2 changes: 1 addition & 1 deletion test/test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2012-2014 Apcera Inc. All rights reserved.
// Copyright 2012-2015 Apcera Inc. All rights reserved.

package test

Expand Down

0 comments on commit f26e2e4

Please sign in to comment.