diff --git a/server/monitor_test.go b/server/monitor_test.go index d679c1e2..6f555981 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -8,6 +8,7 @@ import ( "io/ioutil" "net/http" "net/url" + "runtime" "strings" "testing" "time" @@ -382,6 +383,12 @@ func TestConnzLastActivity(t *testing.T) { t.Fatalf("Expected LastActivity to be valid\n") } + // On Windows, looks like the precision is too low, and if we + // don't wait, first and last would be equal. + if runtime.GOOS == "windows" { + time.Sleep(100 * time.Millisecond) + } + // Sub should trigger update. nc.Subscribe("hello.world", func(m *nats.Msg) {}) nc.Flush() @@ -390,6 +397,13 @@ func TestConnzLastActivity(t *testing.T) { if firstLast.Equal(subLast) { t.Fatalf("Subscribe should have triggered update to LastActivity\n") } + + // On Windows, looks like the precision is too low, and if we + // don't wait, first and last would be equal. + if runtime.GOOS == "windows" { + time.Sleep(100 * time.Millisecond) + } + // Pub should trigger as well nc.Publish("foo", []byte("Hello")) nc.Flush() @@ -398,6 +412,13 @@ func TestConnzLastActivity(t *testing.T) { if subLast.Equal(pubLast) { t.Fatalf("Publish should have triggered update to LastActivity\n") } + + // On Windows, looks like the precision is too low, and if we + // don't wait, first and last would be equal. + if runtime.GOOS == "windows" { + time.Sleep(100 * time.Millisecond) + } + // Message delivery should trigger as well nc2.Publish("foo", []byte("Hello")) nc2.Flush() diff --git a/server/pse_test.go b/server/pse_test.go index 51991abc..c3948863 100644 --- a/server/pse_test.go +++ b/server/pse_test.go @@ -6,10 +6,14 @@ import ( "fmt" "os" "os/exec" + "runtime" "testing" ) func TestPSEmulation(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skipf("Skipping this test on Windows") + } var rss, vss, psRss, psVss int64 var pcpu, psPcpu float64 diff --git a/server/route.go b/server/route.go index 39c983e9..e5180421 100644 --- a/server/route.go +++ b/server/route.go @@ -429,6 +429,10 @@ func (s *Server) addRoute(c *client, info *Info) (bool, bool) { sendInfo := false s.mu.Lock() + if !s.running { + s.mu.Unlock() + return false, false + } remote, exists := s.remotes[id] if !exists { s.routes[c.cid] = c diff --git a/server/server.go b/server/server.go index 4a4d920c..f8b3c98b 100644 --- a/server/server.go +++ b/server/server.go @@ -711,8 +711,8 @@ func (s *Server) Addr() net.Addr { } // GetListenEndpoint will return a string of the form host:port suitable for -// a connect. Will return nil if the server is not ready to accept client -// connections. +// a connect. Will return empty string if the server is not ready to accept +// client connections. func (s *Server) GetListenEndpoint() string { s.mu.Lock() defer s.mu.Unlock() @@ -734,6 +734,29 @@ func (s *Server) GetListenEndpoint() string { return net.JoinHostPort(host, strconv.Itoa(s.opts.Port)) } +// GetRouteListenEndpoint will return a string of the form host:port suitable +// for a connect. Will return empty string if the server is not configured for +// routing or not ready to accept route connections. +func (s *Server) GetRouteListenEndpoint() string { + s.mu.Lock() + defer s.mu.Unlock() + + if s.routeListener == nil { + return "" + } + + host := s.opts.ClusterHost + + // On windows, a connect with host "0.0.0.0" (or "::") will fail. + // We replace it with "localhost" when that's the case. + if host == "0.0.0.0" || host == "::" || host == "[::]" { + host = "localhost" + } + + // Return the cluster's Host and Port. + return net.JoinHostPort(host, strconv.Itoa(s.opts.ClusterPort)) +} + // Server's ID func (s *Server) Id() string { s.mu.Lock() diff --git a/test/client_cluster_test.go b/test/client_cluster_test.go index b4a27163..ce145bcf 100644 --- a/test/client_cluster_test.go +++ b/test/client_cluster_test.go @@ -39,6 +39,8 @@ func TestServerRestartReSliceIssue(t *testing.T) { reconnectsDone <- true } + clients := make([]*nats.Conn, numClients) + // Create 20 random clients. // Half connected to A and half to B.. for i := 0; i < numClients; i++ { @@ -47,6 +49,7 @@ func TestServerRestartReSliceIssue(t *testing.T) { if err != nil { t.Fatalf("Failed to create connection: %v\n", err) } + clients[i] = nc defer nc.Close() // Create 10 subscriptions each.. @@ -79,11 +82,26 @@ func TestServerRestartReSliceIssue(t *testing.T) { srvB = RunServer(optsB) defer srvB.Shutdown() - select { - case <-reconnectsDone: - break - case <-time.After(3 * time.Second): - t.Fatalf("Expected %d reconnects, got %d\n", numClients/2, reconnects) + // Check that all expected clients have reconnected + for i := 0; i < numClients/2; i++ { + select { + case <-reconnectsDone: + break + case <-time.After(3 * time.Second): + t.Fatalf("Expected %d reconnects, got %d\n", numClients/2, reconnects) + } + } + + // Since srvB was restarted, its defer Shutdown() was last, so will + // exectue first, which would cause clients that have reconnected to + // it to try to reconnect (causing delays on Windows). So let's + // explicitly close them here. + // NOTE: With fix of NATS GO client (reconnect loop yields to Close()), + // this change would not be required, however, it still speeeds up + // the test, from more than 7s to less than one. + for i := 0; i < numClients; i++ { + nc := clients[i] + nc.Close() } } @@ -211,6 +229,11 @@ func TestServerRestartAndQueueSubs(t *testing.T) { // Base Test //////////////////////////////////////////////////////////////////////////// + // Make sure subscriptions are propagated in the cluster + if err := checkExpectedSubs(4, srvA, srvB); err != nil { + t.Fatalf("%v", err) + } + // Now send 10 messages, from each client.. sendAndCheckMsgs(10) @@ -228,10 +251,23 @@ func TestServerRestartAndQueueSubs(t *testing.T) { waitOnReconnect() + // Make sure the cluster is reformed checkClusterFormed(t, srvA, srvB) + // Make sure subscriptions are propagated in the cluster + if err := checkExpectedSubs(4, srvA, srvB); err != nil { + t.Fatalf("%v", err) + } + // Now send another 10 messages, from each client.. sendAndCheckMsgs(10) + + // Since servers are restarted after all client's close defer calls, + // their defer Shutdown() are last, and so will be executed first, + // which would cause clients to try to reconnect on exit, causing + // delays on Windows. So let's explicitly close them here. + c1.Close() + c2.Close() } // This will test request semantics across a route diff --git a/test/cluster_test.go b/test/cluster_test.go index 8d7ec11f..7b93ce98 100644 --- a/test/cluster_test.go +++ b/test/cluster_test.go @@ -3,6 +3,7 @@ package test import ( + "errors" "fmt" "runtime" "testing" @@ -36,6 +37,31 @@ func checkClusterFormed(t *testing.T, servers ...*server.Server) { } } +// Helper function to check that a server (or list of servers) have the +// expected number of subscriptions +func checkExpectedSubs(expected int, servers ...*server.Server) error { + var err string + maxTime := time.Now().Add(5 * time.Second) + for time.Now().Before(maxTime) { + err = "" + for _, s := range servers { + if numSubs := int(s.NumSubscriptions()); numSubs != expected { + err = fmt.Sprintf("Expected %d subscriptions for server %q, got %d", expected, s.Id(), numSubs) + break + } + } + if err != "" { + time.Sleep(100 * time.Millisecond) + } else { + break + } + } + if err != "" { + return errors.New(err) + } + return nil +} + func runServers(t *testing.T) (srvA, srvB *server.Server, optsA, optsB *server.Options) { srvA, optsA = RunServerWithConfig("./configs/srv_a.conf") srvB, optsB = RunServerWithConfig("./configs/srv_b.conf") @@ -120,6 +146,11 @@ func TestClusterQueueSubs(t *testing.T) { sendA("PING\r\n") expectA(pongRe) + // Make sure the subs have propagated to srvB before continuing + if err := checkExpectedSubs(len(qg1Sids_a), srvB); err != nil { + t.Fatalf("%v", err) + } + sendB("PUB foo 2\r\nok\r\n") sendB("PING\r\n") expectB(pongRe) @@ -142,6 +173,11 @@ func TestClusterQueueSubs(t *testing.T) { sendA("PING\r\n") expectA(pongRe) + // Make sure the subs have propagated to srvB before continuing + if err := checkExpectedSubs(len(qg1Sids_a)+len(pSids), srvB); err != nil { + t.Fatalf("%v", err) + } + // Send to B sendB("PUB foo 2\r\nok\r\n") sendB("PING\r\n") @@ -168,6 +204,11 @@ func TestClusterQueueSubs(t *testing.T) { sendB("PING\r\n") expectB(pongRe) + // Make sure the subs have propagated to srvA before continuing + if err := checkExpectedSubs(len(qg1Sids_a)+len(pSids)+len(qg2Sids_b), srvA); err != nil { + t.Fatalf("%v", err) + } + // Send to B sendB("PUB foo 2\r\nok\r\n") @@ -187,6 +228,11 @@ func TestClusterQueueSubs(t *testing.T) { sendA("PING\r\n") expectA(pongRe) + // Make sure the subs have propagated to srvB before continuing + if err := checkExpectedSubs(len(pSids)+len(qg2Sids_b), srvB); err != nil { + t.Fatalf("%v", err) + } + // Send to B sendB("PUB foo 2\r\nok\r\n") @@ -215,8 +261,6 @@ func TestClusterDoubleMsgs(t *testing.T) { defer srvA.Shutdown() defer srvB.Shutdown() - time.Sleep(50 * time.Millisecond) - clientA1 := createClientConn(t, optsA.Host, optsA.Port) defer clientA1.Close() @@ -243,6 +287,11 @@ func TestClusterDoubleMsgs(t *testing.T) { sendA1("PING\r\n") expectA1(pongRe) + // Make sure the subs have propagated to srvB before continuing + if err := checkExpectedSubs(len(qg1Sids_a), srvB); err != nil { + t.Fatalf("%v", err) + } + sendB("PUB foo 2\r\nok\r\n") sendB("PING\r\n") expectB(pongRe) @@ -259,6 +308,11 @@ func TestClusterDoubleMsgs(t *testing.T) { expectA2(pongRe) pSids := []string{"1", "2"} + // Make sure the subs have propagated to srvB before continuing + if err := checkExpectedSubs(len(qg1Sids_a)+2, srvB); err != nil { + t.Fatalf("%v", err) + } + sendB("PUB foo 2\r\nok\r\n") sendB("PING\r\n") expectB(pongRe) diff --git a/test/maxpayload_test.go b/test/maxpayload_test.go index 829d82e3..45e4a236 100644 --- a/test/maxpayload_test.go +++ b/test/maxpayload_test.go @@ -5,8 +5,10 @@ package test import ( "fmt" "net" + "runtime" "strings" "testing" + "time" "github.com/nats-io/nats" ) @@ -34,6 +36,7 @@ func TestMaxPayload(t *testing.T) { if err != nil { t.Fatalf("Could not make a raw connection to the server: %v", err) } + defer conn.Close() info := make([]byte, 512) _, err = conn.Read(info) if err != nil { @@ -65,7 +68,21 @@ func TestMaxPayload(t *testing.T) { // publishing the bytes following what is suggested by server // in the info message has its connection closed. _, err = conn.Write(big) - if err == nil { + if err == nil && runtime.GOOS != "windows" { t.Errorf("Expected error due to maximum payload transgression.") } + + // On windows, the previous write will not fail because the connection + // is not fully closed at this stage. + if runtime.GOOS == "windows" { + // Issuing a PING and not expecting the PONG. + _, err = conn.Write([]byte("PING\r\n")) + if err == nil { + conn.SetReadDeadline(time.Now().Add(500 * time.Millisecond)) + _, err = conn.Read(big) + if err == nil { + t.Errorf("Expected closed connection due to maximum payload transgression.") + } + } + } } diff --git a/test/routes_test.go b/test/routes_test.go index e0e39d28..ad6e6a13 100644 --- a/test/routes_test.go +++ b/test/routes_test.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + "net" "runtime" "strings" "testing" @@ -14,6 +15,43 @@ import ( "github.com/nats-io/gnatsd/server" ) +func shutdownServerAndWait(t *testing.T, s *server.Server) bool { + listenSpec := s.GetListenEndpoint() + routeListenSpec := s.GetRouteListenEndpoint() + + s.Shutdown() + + // For now, do this only on Windows. Lots of tests would fail + // without this because the listen port would linger from one + // test to another causing failures. + checkShutdown := func(listen string) bool { + down := false + maxTime := time.Now().Add(5 * time.Second) + for time.Now().Before(maxTime) { + conn, err := net.Dial("tcp", listen) + if err != nil { + down = true + break + } + conn.Close() + // Retry after 50ms + time.Sleep(50 * time.Millisecond) + } + return down + } + if listenSpec != "" { + if !checkShutdown(listenSpec) { + return false + } + } + if routeListenSpec != "" { + if !checkShutdown(routeListenSpec) { + return false + } + } + return true +} + func runRouteServer(t *testing.T) (*server.Server, *server.Options) { return RunServerWithConfig("./configs/cluster.conf") } @@ -146,6 +184,12 @@ func TestSendRouteSubAndUnsub(t *testing.T) { if rsid2 != rsid { t.Fatalf("Expected rsid's to match. %q vs %q\n", rsid, rsid2) } + + // Explicitly shutdown the server, otherwise this test would + // cause following test to fail. + if down := shutdownServerAndWait(t, s); !down { + t.Fatal("Unable to verify server was shutdown") + } } func TestSendRouteSolicit(t *testing.T) { diff --git a/test/tls_test.go b/test/tls_test.go index 01e08c0f..80cb771c 100644 --- a/test/tls_test.go +++ b/test/tls_test.go @@ -151,7 +151,7 @@ func stressConnect(t *testing.T, wg *sync.WaitGroup, errCh chan error, url strin subName := fmt.Sprintf("foo.%d", index) - for i := 0; i < 100; i++ { + for i := 0; i < 33; i++ { nc, err := nats.Connect(url, nats.RootCAs("./configs/certs/ca.pem")) if err != nil { errCh <- fmt.Errorf("Unable to create TLS connection: %v\n", err)