diff --git a/server/route.go b/server/route.go index 86a2586d..f9e04343 100644 --- a/server/route.go +++ b/server/route.go @@ -598,14 +598,14 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) { return } - // Let them know we are up - close(ch) - // Setup state that can enable shutdown s.mu.Lock() s.routeListener = l s.mu.Unlock() + // Let them know we are up + close(ch) + tmpDelay := ACCEPT_MIN_SLEEP for s.isRunning() { diff --git a/server/routes_test.go b/server/routes_test.go index 6eb8ff1b..14cbeeb7 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -455,9 +455,19 @@ func TestRouteUseIPv6(t *testing.T) { routeUp := false timeout := time.Now().Add(5 * time.Second) for time.Now().Before(timeout) && !routeUp { - if s.GetRouteListenEndpoint() == "" { - time.Sleep(time.Second) - continue + // We know that the server is local and listening to + // all IPv6 interfaces. Try connect using IPv6 loopback. + if conn, err := net.Dial("tcp", "[::1]:6222"); err != nil { + // Travis seem to have the server actually listening to 0.0.0.0, + // so try with 127.0.0.1 + if conn, err := net.Dial("tcp", "127.0.0.1:6222"); err != nil { + time.Sleep(time.Second) + continue + } else { + conn.Close() + } + } else { + conn.Close() } routeUp = true } diff --git a/server/server.go b/server/server.go index adbe5358..7d616d2b 100644 --- a/server/server.go +++ b/server/server.go @@ -839,51 +839,21 @@ func (s *Server) Addr() net.Addr { return s.listener.Addr() } -// GetListenEndpoint will return a string of the form host:port suitable for -// a connect. Will return empty string if the server is not ready to accept -// client connections. -func (s *Server) GetListenEndpoint() string { - s.mu.Lock() - defer s.mu.Unlock() - // Wait for the listener to be set, see note about RANDOM_PORT below - if s.listener == nil { - return "" +// ReadyForConnections returns `true` if the server is ready to accept client +// and, if routing is enabled, route connections. If after the duration +// `dur` the server is still not ready, returns `false`. +func (s *Server) ReadyForConnections(dur time.Duration) bool { + end := time.Now().Add(dur) + for time.Now().Before(end) { + s.mu.Lock() + ok := s.listener != nil && (s.opts.Cluster.Port == 0 || s.routeListener != nil) + s.mu.Unlock() + if ok { + return true + } + time.Sleep(25 * time.Millisecond) } - - host := s.opts.Host - - // On windows, a connect with host "0.0.0.0" (or "::") will fail. - // We replace it with "localhost" when that's the case. - if host == "0.0.0.0" || host == "::" || host == "[::]" { - host = "localhost" - } - - // Return the opts's Host and Port. Note that the Port may be set - // when the listener is started, due to the use of RANDOM_PORT - return net.JoinHostPort(host, strconv.Itoa(s.opts.Port)) -} - -// GetRouteListenEndpoint will return a string of the form host:port suitable -// for a connect. Will return empty string if the server is not configured for -// routing or not ready to accept route connections. -func (s *Server) GetRouteListenEndpoint() string { - s.mu.Lock() - defer s.mu.Unlock() - - if s.routeListener == nil { - return "" - } - - host := s.opts.Cluster.Host - - // On windows, a connect with host "0.0.0.0" (or "::") will fail. - // We replace it with "localhost" when that's the case. - if host == "0.0.0.0" || host == "::" || host == "[::]" { - host = "localhost" - } - - // Return the cluster's Host and Port. - return net.JoinHostPort(host, strconv.Itoa(s.opts.Cluster.Port)) + return false } // ID returns the server's ID diff --git a/server/server_test.go b/server/server_test.go index 2523b3cb..879759cd 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -36,30 +36,11 @@ func RunServer(opts *Options) *Server { // Run server in Go routine. go s.Start() - end := time.Now().Add(10 * time.Second) - for time.Now().Before(end) { - addr := s.GetListenEndpoint() - if addr == "" { - time.Sleep(10 * time.Millisecond) - // Retry. We might take a little while to open a connection. - continue - } - conn, err := net.Dial("tcp", addr) - if err != nil { - // Retry after 50ms - time.Sleep(50 * time.Millisecond) - continue - } - conn.Close() - // Wait a bit to give a chance to the server to remove this - // "client" from its state, which may otherwise interfere with - // some tests. - time.Sleep(25 * time.Millisecond) - - return s + // Wait for accept loop(s) to be started + if !s.ReadyForConnections(10 * time.Second) { + panic("Unable to start NATS Server in Go Routine") } - panic("Unable to start NATS Server in Go Routine") - + return s } func TestStartupAndShutdown(t *testing.T) { diff --git a/test/routes_test.go b/test/routes_test.go index c45f82e0..4ccd58ba 100644 --- a/test/routes_test.go +++ b/test/routes_test.go @@ -21,43 +21,6 @@ import ( const clientProtoInfo = 1 -func shutdownServerAndWait(t *testing.T, s *server.Server) bool { - listenSpec := s.GetListenEndpoint() - routeListenSpec := s.GetRouteListenEndpoint() - - s.Shutdown() - - // For now, do this only on Windows. Lots of tests would fail - // without this because the listen port would linger from one - // test to another causing failures. - checkShutdown := func(listen string) bool { - down := false - maxTime := time.Now().Add(5 * time.Second) - for time.Now().Before(maxTime) { - conn, err := net.Dial("tcp", listen) - if err != nil { - down = true - break - } - conn.Close() - // Retry after 50ms - time.Sleep(50 * time.Millisecond) - } - return down - } - if listenSpec != "" { - if !checkShutdown(listenSpec) { - return false - } - } - if routeListenSpec != "" { - if !checkShutdown(routeListenSpec) { - return false - } - } - return true -} - func runRouteServer(t *testing.T) (*server.Server, *server.Options) { return RunServerWithConfig("./configs/cluster.conf") } @@ -196,9 +159,7 @@ func TestSendRouteSubAndUnsub(t *testing.T) { // Explicitly shutdown the server, otherwise this test would // cause following test to fail. - if down := shutdownServerAndWait(t, s); !down { - t.Fatal("Unable to verify server was shutdown") - } + s.Shutdown() } func TestSendRouteSolicit(t *testing.T) { @@ -349,12 +310,6 @@ func TestRouteQueueSemantics(t *testing.T) { defer client.Close() - // Make sure client connection is fully processed before creating route - // connection, so we are sure that client ID will be "2" ("1" being used - // by the connection created to check the server is started) - clientSend("PING\r\n") - clientExpect(pongRe) - route := createRouteConn(t, opts.Cluster.Host, opts.Cluster.Port) defer route.Close() @@ -364,9 +319,9 @@ func TestRouteQueueSemantics(t *testing.T) { expectMsgs := expectMsgsCommand(t, routeExpect) // Express multiple interest on this route for foo, queue group bar. - qrsid1 := "QRSID:2:1" + qrsid1 := "QRSID:1:1" routeSend(fmt.Sprintf("SUB foo bar %s\r\n", qrsid1)) - qrsid2 := "QRSID:2:2" + qrsid2 := "QRSID:1:2" routeSend(fmt.Sprintf("SUB foo bar %s\r\n", qrsid2)) // Use ping roundtrip to make sure its processed. @@ -384,7 +339,7 @@ func TestRouteQueueSemantics(t *testing.T) { checkMsg(t, matches[0], "foo", "", "", "2", "ok") // Add normal Interest as well to route interest. - routeSend("SUB foo RSID:2:4\r\n") + routeSend("SUB foo RSID:1:4\r\n") // Use ping roundtrip to make sure its processed. routeSend("PING\r\n") @@ -400,8 +355,8 @@ func TestRouteQueueSemantics(t *testing.T) { matches = expectMsgs(2) // Expect first to be the normal subscriber, next will be the queue one. - if string(matches[0][sidIndex]) != "RSID:2:4" && - string(matches[1][sidIndex]) != "RSID:2:4" { + if string(matches[0][sidIndex]) != "RSID:1:4" && + string(matches[1][sidIndex]) != "RSID:1:4" { t.Fatalf("Did not received routed sid\n") } checkMsg(t, matches[0], "foo", "", "", "2", "ok") @@ -432,9 +387,9 @@ func TestRouteQueueSemantics(t *testing.T) { routeExpect(subRe) // Deliver a MSG from the route itself, make sure the client receives both. - routeSend("MSG foo RSID:2:1 2\r\nok\r\n") + routeSend("MSG foo RSID:1:1 2\r\nok\r\n") // Queue group one. - routeSend("MSG foo QRSID:2:2 2\r\nok\r\n") + routeSend("MSG foo QRSID:1:2 2\r\nok\r\n") // Use ping roundtrip to make sure its processed. routeSend("PING\r\n") diff --git a/test/test.go b/test/test.go index f4166942..9dd485d1 100644 --- a/test/test.go +++ b/test/test.go @@ -97,28 +97,11 @@ func RunServerWithAuth(opts *server.Options, auth server.Auth) *server.Server { // Run server in Go routine. go s.Start() - end := time.Now().Add(10 * time.Second) - for time.Now().Before(end) { - addr := s.GetListenEndpoint() - if addr == "" { - time.Sleep(50 * time.Millisecond) - // Retry. We might take a little while to open a connection. - continue - } - conn, err := net.Dial("tcp", addr) - if err != nil { - // Retry after 50ms - time.Sleep(50 * time.Millisecond) - continue - } - conn.Close() - // Wait a bit to give a chance to the server to remove this - // "client" from its state, which may otherwise interfere with - // some tests. - time.Sleep(25 * time.Millisecond) - return s + // Wait for accept loop(s) to be started + if !s.ReadyForConnections(10 * time.Second) { + panic("Unable to start NATS Server in Go Routine") } - panic("Unable to start NATS Server in Go Routine") + return s } func stackFatalf(t tLogger, f string, args ...interface{}) {