Replace GetListenEndpoint() with ReadyForConnections()

The RunServer() function (and the various variants)
call Server.Start() in a go-routine, but do not return until
it has verified that the server is ready to accept connections.
To do so, it use GetListenEndpoint() to get a suitable connect
address (replacing "0.0.0.0" or "::" with localhost - important
on Windows). It then creates a raw TCP connection to ensure the
server is started, repeating the process in case of failure up
to 10 seconds.

This PR replaces this with a function that checks that client
listener, and route listener if configured, are set. This removes
the need to get a connect address and create test tcp connections.

The reason for this change is that NATS Streaming when starting
the NATS Server (unless configured to connect to a remote one)
calls RunServerWithAuth(), which when getting "localhost" from
GetListenEndpoint(), would fail trying to resolve it. This happened
for the NATS Streaming Docker image built with Go 1.7+.
This commit is contained in:
Ivan Kozlovic
2016-12-09 13:35:31 -07:00
parent 0598bfa3b0
commit 5f471b6e7f
6 changed files with 46 additions and 147 deletions

View File

@@ -598,14 +598,14 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) {
return
}
// Let them know we are up
close(ch)
// Setup state that can enable shutdown
s.mu.Lock()
s.routeListener = l
s.mu.Unlock()
// Let them know we are up
close(ch)
tmpDelay := ACCEPT_MIN_SLEEP
for s.isRunning() {

View File

@@ -455,9 +455,19 @@ func TestRouteUseIPv6(t *testing.T) {
routeUp := false
timeout := time.Now().Add(5 * time.Second)
for time.Now().Before(timeout) && !routeUp {
if s.GetRouteListenEndpoint() == "" {
time.Sleep(time.Second)
continue
// We know that the server is local and listening to
// all IPv6 interfaces. Try connect using IPv6 loopback.
if conn, err := net.Dial("tcp", "[::1]:6222"); err != nil {
// Travis seem to have the server actually listening to 0.0.0.0,
// so try with 127.0.0.1
if conn, err := net.Dial("tcp", "127.0.0.1:6222"); err != nil {
time.Sleep(time.Second)
continue
} else {
conn.Close()
}
} else {
conn.Close()
}
routeUp = true
}

View File

@@ -839,51 +839,21 @@ func (s *Server) Addr() net.Addr {
return s.listener.Addr()
}
// GetListenEndpoint will return a string of the form host:port suitable for
// a connect. Will return empty string if the server is not ready to accept
// client connections.
func (s *Server) GetListenEndpoint() string {
s.mu.Lock()
defer s.mu.Unlock()
// Wait for the listener to be set, see note about RANDOM_PORT below
if s.listener == nil {
return ""
// ReadyForConnections returns `true` if the server is ready to accept client
// and, if routing is enabled, route connections. If after the duration
// `dur` the server is still not ready, returns `false`.
func (s *Server) ReadyForConnections(dur time.Duration) bool {
end := time.Now().Add(dur)
for time.Now().Before(end) {
s.mu.Lock()
ok := s.listener != nil && (s.opts.Cluster.Port == 0 || s.routeListener != nil)
s.mu.Unlock()
if ok {
return true
}
time.Sleep(25 * time.Millisecond)
}
host := s.opts.Host
// On windows, a connect with host "0.0.0.0" (or "::") will fail.
// We replace it with "localhost" when that's the case.
if host == "0.0.0.0" || host == "::" || host == "[::]" {
host = "localhost"
}
// Return the opts's Host and Port. Note that the Port may be set
// when the listener is started, due to the use of RANDOM_PORT
return net.JoinHostPort(host, strconv.Itoa(s.opts.Port))
}
// GetRouteListenEndpoint will return a string of the form host:port suitable
// for a connect. Will return empty string if the server is not configured for
// routing or not ready to accept route connections.
func (s *Server) GetRouteListenEndpoint() string {
s.mu.Lock()
defer s.mu.Unlock()
if s.routeListener == nil {
return ""
}
host := s.opts.Cluster.Host
// On windows, a connect with host "0.0.0.0" (or "::") will fail.
// We replace it with "localhost" when that's the case.
if host == "0.0.0.0" || host == "::" || host == "[::]" {
host = "localhost"
}
// Return the cluster's Host and Port.
return net.JoinHostPort(host, strconv.Itoa(s.opts.Cluster.Port))
return false
}
// ID returns the server's ID

View File

@@ -36,30 +36,11 @@ func RunServer(opts *Options) *Server {
// Run server in Go routine.
go s.Start()
end := time.Now().Add(10 * time.Second)
for time.Now().Before(end) {
addr := s.GetListenEndpoint()
if addr == "" {
time.Sleep(10 * time.Millisecond)
// Retry. We might take a little while to open a connection.
continue
}
conn, err := net.Dial("tcp", addr)
if err != nil {
// Retry after 50ms
time.Sleep(50 * time.Millisecond)
continue
}
conn.Close()
// Wait a bit to give a chance to the server to remove this
// "client" from its state, which may otherwise interfere with
// some tests.
time.Sleep(25 * time.Millisecond)
return s
// Wait for accept loop(s) to be started
if !s.ReadyForConnections(10 * time.Second) {
panic("Unable to start NATS Server in Go Routine")
}
panic("Unable to start NATS Server in Go Routine")
return s
}
func TestStartupAndShutdown(t *testing.T) {

View File

@@ -21,43 +21,6 @@ import (
const clientProtoInfo = 1
func shutdownServerAndWait(t *testing.T, s *server.Server) bool {
listenSpec := s.GetListenEndpoint()
routeListenSpec := s.GetRouteListenEndpoint()
s.Shutdown()
// For now, do this only on Windows. Lots of tests would fail
// without this because the listen port would linger from one
// test to another causing failures.
checkShutdown := func(listen string) bool {
down := false
maxTime := time.Now().Add(5 * time.Second)
for time.Now().Before(maxTime) {
conn, err := net.Dial("tcp", listen)
if err != nil {
down = true
break
}
conn.Close()
// Retry after 50ms
time.Sleep(50 * time.Millisecond)
}
return down
}
if listenSpec != "" {
if !checkShutdown(listenSpec) {
return false
}
}
if routeListenSpec != "" {
if !checkShutdown(routeListenSpec) {
return false
}
}
return true
}
func runRouteServer(t *testing.T) (*server.Server, *server.Options) {
return RunServerWithConfig("./configs/cluster.conf")
}
@@ -196,9 +159,7 @@ func TestSendRouteSubAndUnsub(t *testing.T) {
// Explicitly shutdown the server, otherwise this test would
// cause following test to fail.
if down := shutdownServerAndWait(t, s); !down {
t.Fatal("Unable to verify server was shutdown")
}
s.Shutdown()
}
func TestSendRouteSolicit(t *testing.T) {
@@ -349,12 +310,6 @@ func TestRouteQueueSemantics(t *testing.T) {
defer client.Close()
// Make sure client connection is fully processed before creating route
// connection, so we are sure that client ID will be "2" ("1" being used
// by the connection created to check the server is started)
clientSend("PING\r\n")
clientExpect(pongRe)
route := createRouteConn(t, opts.Cluster.Host, opts.Cluster.Port)
defer route.Close()
@@ -364,9 +319,9 @@ func TestRouteQueueSemantics(t *testing.T) {
expectMsgs := expectMsgsCommand(t, routeExpect)
// Express multiple interest on this route for foo, queue group bar.
qrsid1 := "QRSID:2:1"
qrsid1 := "QRSID:1:1"
routeSend(fmt.Sprintf("SUB foo bar %s\r\n", qrsid1))
qrsid2 := "QRSID:2:2"
qrsid2 := "QRSID:1:2"
routeSend(fmt.Sprintf("SUB foo bar %s\r\n", qrsid2))
// Use ping roundtrip to make sure its processed.
@@ -384,7 +339,7 @@ func TestRouteQueueSemantics(t *testing.T) {
checkMsg(t, matches[0], "foo", "", "", "2", "ok")
// Add normal Interest as well to route interest.
routeSend("SUB foo RSID:2:4\r\n")
routeSend("SUB foo RSID:1:4\r\n")
// Use ping roundtrip to make sure its processed.
routeSend("PING\r\n")
@@ -400,8 +355,8 @@ func TestRouteQueueSemantics(t *testing.T) {
matches = expectMsgs(2)
// Expect first to be the normal subscriber, next will be the queue one.
if string(matches[0][sidIndex]) != "RSID:2:4" &&
string(matches[1][sidIndex]) != "RSID:2:4" {
if string(matches[0][sidIndex]) != "RSID:1:4" &&
string(matches[1][sidIndex]) != "RSID:1:4" {
t.Fatalf("Did not received routed sid\n")
}
checkMsg(t, matches[0], "foo", "", "", "2", "ok")
@@ -432,9 +387,9 @@ func TestRouteQueueSemantics(t *testing.T) {
routeExpect(subRe)
// Deliver a MSG from the route itself, make sure the client receives both.
routeSend("MSG foo RSID:2:1 2\r\nok\r\n")
routeSend("MSG foo RSID:1:1 2\r\nok\r\n")
// Queue group one.
routeSend("MSG foo QRSID:2:2 2\r\nok\r\n")
routeSend("MSG foo QRSID:1:2 2\r\nok\r\n")
// Use ping roundtrip to make sure its processed.
routeSend("PING\r\n")

View File

@@ -97,28 +97,11 @@ func RunServerWithAuth(opts *server.Options, auth server.Auth) *server.Server {
// Run server in Go routine.
go s.Start()
end := time.Now().Add(10 * time.Second)
for time.Now().Before(end) {
addr := s.GetListenEndpoint()
if addr == "" {
time.Sleep(50 * time.Millisecond)
// Retry. We might take a little while to open a connection.
continue
}
conn, err := net.Dial("tcp", addr)
if err != nil {
// Retry after 50ms
time.Sleep(50 * time.Millisecond)
continue
}
conn.Close()
// Wait a bit to give a chance to the server to remove this
// "client" from its state, which may otherwise interfere with
// some tests.
time.Sleep(25 * time.Millisecond)
return s
// Wait for accept loop(s) to be started
if !s.ReadyForConnections(10 * time.Second) {
panic("Unable to start NATS Server in Go Routine")
}
panic("Unable to start NATS Server in Go Routine")
return s
}
func stackFatalf(t tLogger, f string, args ...interface{}) {