Skip to content

Commit

Permalink
Merge pull request #401 from nats-io/wait_server_ready
Browse files Browse the repository at this point in the history
Replace GetListenEndpoint() with ReadyForConnections()
  • Loading branch information
derekcollison authored Dec 9, 2016
2 parents 0598bfa + 5f471b6 commit c39204b
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 147 deletions.
6 changes: 3 additions & 3 deletions server/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,14 +598,14 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) {
return
}

// Let them know we are up
close(ch)

// Setup state that can enable shutdown
s.mu.Lock()
s.routeListener = l
s.mu.Unlock()

// Let them know we are up
close(ch)

tmpDelay := ACCEPT_MIN_SLEEP

for s.isRunning() {
Expand Down
16 changes: 13 additions & 3 deletions server/routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,9 +455,19 @@ func TestRouteUseIPv6(t *testing.T) {
routeUp := false
timeout := time.Now().Add(5 * time.Second)
for time.Now().Before(timeout) && !routeUp {
if s.GetRouteListenEndpoint() == "" {
time.Sleep(time.Second)
continue
// We know that the server is local and listening to
// all IPv6 interfaces. Try connect using IPv6 loopback.
if conn, err := net.Dial("tcp", "[::1]:6222"); err != nil {
// Travis seem to have the server actually listening to 0.0.0.0,
// so try with 127.0.0.1
if conn, err := net.Dial("tcp", "127.0.0.1:6222"); err != nil {
time.Sleep(time.Second)
continue
} else {
conn.Close()
}
} else {
conn.Close()
}
routeUp = true
}
Expand Down
58 changes: 14 additions & 44 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -839,51 +839,21 @@ func (s *Server) Addr() net.Addr {
return s.listener.Addr()
}

// GetListenEndpoint will return a string of the form host:port suitable for
// a connect. Will return empty string if the server is not ready to accept
// client connections.
func (s *Server) GetListenEndpoint() string {
s.mu.Lock()
defer s.mu.Unlock()
// Wait for the listener to be set, see note about RANDOM_PORT below
if s.listener == nil {
return ""
}

host := s.opts.Host

// On windows, a connect with host "0.0.0.0" (or "::") will fail.
// We replace it with "localhost" when that's the case.
if host == "0.0.0.0" || host == "::" || host == "[::]" {
host = "localhost"
}

// Return the opts's Host and Port. Note that the Port may be set
// when the listener is started, due to the use of RANDOM_PORT
return net.JoinHostPort(host, strconv.Itoa(s.opts.Port))
}

// GetRouteListenEndpoint will return a string of the form host:port suitable
// for a connect. Will return empty string if the server is not configured for
// routing or not ready to accept route connections.
func (s *Server) GetRouteListenEndpoint() string {
s.mu.Lock()
defer s.mu.Unlock()

if s.routeListener == nil {
return ""
}

host := s.opts.Cluster.Host

// On windows, a connect with host "0.0.0.0" (or "::") will fail.
// We replace it with "localhost" when that's the case.
if host == "0.0.0.0" || host == "::" || host == "[::]" {
host = "localhost"
// ReadyForConnections returns `true` if the server is ready to accept client
// and, if routing is enabled, route connections. If after the duration
// `dur` the server is still not ready, returns `false`.
func (s *Server) ReadyForConnections(dur time.Duration) bool {
end := time.Now().Add(dur)
for time.Now().Before(end) {
s.mu.Lock()
ok := s.listener != nil && (s.opts.Cluster.Port == 0 || s.routeListener != nil)
s.mu.Unlock()
if ok {
return true
}
time.Sleep(25 * time.Millisecond)
}

// Return the cluster's Host and Port.
return net.JoinHostPort(host, strconv.Itoa(s.opts.Cluster.Port))
return false
}

// ID returns the server's ID
Expand Down
27 changes: 4 additions & 23 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,30 +36,11 @@ func RunServer(opts *Options) *Server {
// Run server in Go routine.
go s.Start()

end := time.Now().Add(10 * time.Second)
for time.Now().Before(end) {
addr := s.GetListenEndpoint()
if addr == "" {
time.Sleep(10 * time.Millisecond)
// Retry. We might take a little while to open a connection.
continue
}
conn, err := net.Dial("tcp", addr)
if err != nil {
// Retry after 50ms
time.Sleep(50 * time.Millisecond)
continue
}
conn.Close()
// Wait a bit to give a chance to the server to remove this
// "client" from its state, which may otherwise interfere with
// some tests.
time.Sleep(25 * time.Millisecond)

return s
// Wait for accept loop(s) to be started
if !s.ReadyForConnections(10 * time.Second) {
panic("Unable to start NATS Server in Go Routine")
}
panic("Unable to start NATS Server in Go Routine")

return s
}

func TestStartupAndShutdown(t *testing.T) {
Expand Down
61 changes: 8 additions & 53 deletions test/routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,43 +21,6 @@ import (

const clientProtoInfo = 1

func shutdownServerAndWait(t *testing.T, s *server.Server) bool {
listenSpec := s.GetListenEndpoint()
routeListenSpec := s.GetRouteListenEndpoint()

s.Shutdown()

// For now, do this only on Windows. Lots of tests would fail
// without this because the listen port would linger from one
// test to another causing failures.
checkShutdown := func(listen string) bool {
down := false
maxTime := time.Now().Add(5 * time.Second)
for time.Now().Before(maxTime) {
conn, err := net.Dial("tcp", listen)
if err != nil {
down = true
break
}
conn.Close()
// Retry after 50ms
time.Sleep(50 * time.Millisecond)
}
return down
}
if listenSpec != "" {
if !checkShutdown(listenSpec) {
return false
}
}
if routeListenSpec != "" {
if !checkShutdown(routeListenSpec) {
return false
}
}
return true
}

func runRouteServer(t *testing.T) (*server.Server, *server.Options) {
return RunServerWithConfig("./configs/cluster.conf")
}
Expand Down Expand Up @@ -196,9 +159,7 @@ func TestSendRouteSubAndUnsub(t *testing.T) {

// Explicitly shutdown the server, otherwise this test would
// cause following test to fail.
if down := shutdownServerAndWait(t, s); !down {
t.Fatal("Unable to verify server was shutdown")
}
s.Shutdown()
}

func TestSendRouteSolicit(t *testing.T) {
Expand Down Expand Up @@ -349,12 +310,6 @@ func TestRouteQueueSemantics(t *testing.T) {

defer client.Close()

// Make sure client connection is fully processed before creating route
// connection, so we are sure that client ID will be "2" ("1" being used
// by the connection created to check the server is started)
clientSend("PING\r\n")
clientExpect(pongRe)

route := createRouteConn(t, opts.Cluster.Host, opts.Cluster.Port)
defer route.Close()

Expand All @@ -364,9 +319,9 @@ func TestRouteQueueSemantics(t *testing.T) {
expectMsgs := expectMsgsCommand(t, routeExpect)

// Express multiple interest on this route for foo, queue group bar.
qrsid1 := "QRSID:2:1"
qrsid1 := "QRSID:1:1"
routeSend(fmt.Sprintf("SUB foo bar %s\r\n", qrsid1))
qrsid2 := "QRSID:2:2"
qrsid2 := "QRSID:1:2"
routeSend(fmt.Sprintf("SUB foo bar %s\r\n", qrsid2))

// Use ping roundtrip to make sure its processed.
Expand All @@ -384,7 +339,7 @@ func TestRouteQueueSemantics(t *testing.T) {
checkMsg(t, matches[0], "foo", "", "", "2", "ok")

// Add normal Interest as well to route interest.
routeSend("SUB foo RSID:2:4\r\n")
routeSend("SUB foo RSID:1:4\r\n")

// Use ping roundtrip to make sure its processed.
routeSend("PING\r\n")
Expand All @@ -400,8 +355,8 @@ func TestRouteQueueSemantics(t *testing.T) {
matches = expectMsgs(2)

// Expect first to be the normal subscriber, next will be the queue one.
if string(matches[0][sidIndex]) != "RSID:2:4" &&
string(matches[1][sidIndex]) != "RSID:2:4" {
if string(matches[0][sidIndex]) != "RSID:1:4" &&
string(matches[1][sidIndex]) != "RSID:1:4" {
t.Fatalf("Did not received routed sid\n")
}
checkMsg(t, matches[0], "foo", "", "", "2", "ok")
Expand Down Expand Up @@ -432,9 +387,9 @@ func TestRouteQueueSemantics(t *testing.T) {
routeExpect(subRe)

// Deliver a MSG from the route itself, make sure the client receives both.
routeSend("MSG foo RSID:2:1 2\r\nok\r\n")
routeSend("MSG foo RSID:1:1 2\r\nok\r\n")
// Queue group one.
routeSend("MSG foo QRSID:2:2 2\r\nok\r\n")
routeSend("MSG foo QRSID:1:2 2\r\nok\r\n")

// Use ping roundtrip to make sure its processed.
routeSend("PING\r\n")
Expand Down
25 changes: 4 additions & 21 deletions test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,28 +97,11 @@ func RunServerWithAuth(opts *server.Options, auth server.Auth) *server.Server {
// Run server in Go routine.
go s.Start()

end := time.Now().Add(10 * time.Second)
for time.Now().Before(end) {
addr := s.GetListenEndpoint()
if addr == "" {
time.Sleep(50 * time.Millisecond)
// Retry. We might take a little while to open a connection.
continue
}
conn, err := net.Dial("tcp", addr)
if err != nil {
// Retry after 50ms
time.Sleep(50 * time.Millisecond)
continue
}
conn.Close()
// Wait a bit to give a chance to the server to remove this
// "client" from its state, which may otherwise interfere with
// some tests.
time.Sleep(25 * time.Millisecond)
return s
// Wait for accept loop(s) to be started
if !s.ReadyForConnections(10 * time.Second) {
panic("Unable to start NATS Server in Go Routine")
}
panic("Unable to start NATS Server in Go Routine")
return s
}

func stackFatalf(t tLogger, f string, args ...interface{}) {
Expand Down

0 comments on commit c39204b

Please sign in to comment.