Fixed code and tests to run on Windows

Mainly tests, but also a fix in route.go to reject a route when the
server is being shutdown.
This commit is contained in:
Ivan Kozlovic
2016-03-07 18:47:20 -07:00
parent adb2a059b5
commit 6263c66a40
9 changed files with 214 additions and 11 deletions

View File

@@ -8,6 +8,7 @@ import (
"io/ioutil"
"net/http"
"net/url"
"runtime"
"strings"
"testing"
"time"
@@ -382,6 +383,12 @@ func TestConnzLastActivity(t *testing.T) {
t.Fatalf("Expected LastActivity to be valid\n")
}
// On Windows, looks like the precision is too low, and if we
// don't wait, first and last would be equal.
if runtime.GOOS == "windows" {
time.Sleep(100 * time.Millisecond)
}
// Sub should trigger update.
nc.Subscribe("hello.world", func(m *nats.Msg) {})
nc.Flush()
@@ -390,6 +397,13 @@ func TestConnzLastActivity(t *testing.T) {
if firstLast.Equal(subLast) {
t.Fatalf("Subscribe should have triggered update to LastActivity\n")
}
// On Windows, looks like the precision is too low, and if we
// don't wait, first and last would be equal.
if runtime.GOOS == "windows" {
time.Sleep(100 * time.Millisecond)
}
// Pub should trigger as well
nc.Publish("foo", []byte("Hello"))
nc.Flush()
@@ -398,6 +412,13 @@ func TestConnzLastActivity(t *testing.T) {
if subLast.Equal(pubLast) {
t.Fatalf("Publish should have triggered update to LastActivity\n")
}
// On Windows, looks like the precision is too low, and if we
// don't wait, first and last would be equal.
if runtime.GOOS == "windows" {
time.Sleep(100 * time.Millisecond)
}
// Message delivery should trigger as well
nc2.Publish("foo", []byte("Hello"))
nc2.Flush()

View File

@@ -6,10 +6,14 @@ import (
"fmt"
"os"
"os/exec"
"runtime"
"testing"
)
func TestPSEmulation(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skipf("Skipping this test on Windows")
}
var rss, vss, psRss, psVss int64
var pcpu, psPcpu float64

View File

@@ -429,6 +429,10 @@ func (s *Server) addRoute(c *client, info *Info) (bool, bool) {
sendInfo := false
s.mu.Lock()
if !s.running {
s.mu.Unlock()
return false, false
}
remote, exists := s.remotes[id]
if !exists {
s.routes[c.cid] = c

View File

@@ -711,8 +711,8 @@ func (s *Server) Addr() net.Addr {
}
// GetListenEndpoint will return a string of the form host:port suitable for
// a connect. Will return nil if the server is not ready to accept client
// connections.
// a connect. Will return empty string if the server is not ready to accept
// client connections.
func (s *Server) GetListenEndpoint() string {
s.mu.Lock()
defer s.mu.Unlock()
@@ -734,6 +734,29 @@ func (s *Server) GetListenEndpoint() string {
return net.JoinHostPort(host, strconv.Itoa(s.opts.Port))
}
// GetRouteListenEndpoint will return a string of the form host:port suitable
// for a connect. Will return empty string if the server is not configured for
// routing or not ready to accept route connections.
func (s *Server) GetRouteListenEndpoint() string {
s.mu.Lock()
defer s.mu.Unlock()
if s.routeListener == nil {
return ""
}
host := s.opts.ClusterHost
// On windows, a connect with host "0.0.0.0" (or "::") will fail.
// We replace it with "localhost" when that's the case.
if host == "0.0.0.0" || host == "::" || host == "[::]" {
host = "localhost"
}
// Return the cluster's Host and Port.
return net.JoinHostPort(host, strconv.Itoa(s.opts.ClusterPort))
}
// Server's ID
func (s *Server) Id() string {
s.mu.Lock()

View File

@@ -39,6 +39,8 @@ func TestServerRestartReSliceIssue(t *testing.T) {
reconnectsDone <- true
}
clients := make([]*nats.Conn, numClients)
// Create 20 random clients.
// Half connected to A and half to B..
for i := 0; i < numClients; i++ {
@@ -47,6 +49,7 @@ func TestServerRestartReSliceIssue(t *testing.T) {
if err != nil {
t.Fatalf("Failed to create connection: %v\n", err)
}
clients[i] = nc
defer nc.Close()
// Create 10 subscriptions each..
@@ -79,11 +82,26 @@ func TestServerRestartReSliceIssue(t *testing.T) {
srvB = RunServer(optsB)
defer srvB.Shutdown()
select {
case <-reconnectsDone:
break
case <-time.After(3 * time.Second):
t.Fatalf("Expected %d reconnects, got %d\n", numClients/2, reconnects)
// Check that all expected clients have reconnected
for i := 0; i < numClients/2; i++ {
select {
case <-reconnectsDone:
break
case <-time.After(3 * time.Second):
t.Fatalf("Expected %d reconnects, got %d\n", numClients/2, reconnects)
}
}
// Since srvB was restarted, its defer Shutdown() was last, so will
// exectue first, which would cause clients that have reconnected to
// it to try to reconnect (causing delays on Windows). So let's
// explicitly close them here.
// NOTE: With fix of NATS GO client (reconnect loop yields to Close()),
// this change would not be required, however, it still speeeds up
// the test, from more than 7s to less than one.
for i := 0; i < numClients; i++ {
nc := clients[i]
nc.Close()
}
}
@@ -211,6 +229,11 @@ func TestServerRestartAndQueueSubs(t *testing.T) {
// Base Test
////////////////////////////////////////////////////////////////////////////
// Make sure subscriptions are propagated in the cluster
if err := checkExpectedSubs(4, srvA, srvB); err != nil {
t.Fatalf("%v", err)
}
// Now send 10 messages, from each client..
sendAndCheckMsgs(10)
@@ -228,10 +251,23 @@ func TestServerRestartAndQueueSubs(t *testing.T) {
waitOnReconnect()
// Make sure the cluster is reformed
checkClusterFormed(t, srvA, srvB)
// Make sure subscriptions are propagated in the cluster
if err := checkExpectedSubs(4, srvA, srvB); err != nil {
t.Fatalf("%v", err)
}
// Now send another 10 messages, from each client..
sendAndCheckMsgs(10)
// Since servers are restarted after all client's close defer calls,
// their defer Shutdown() are last, and so will be executed first,
// which would cause clients to try to reconnect on exit, causing
// delays on Windows. So let's explicitly close them here.
c1.Close()
c2.Close()
}
// This will test request semantics across a route

View File

@@ -3,6 +3,7 @@
package test
import (
"errors"
"fmt"
"runtime"
"testing"
@@ -36,6 +37,31 @@ func checkClusterFormed(t *testing.T, servers ...*server.Server) {
}
}
// Helper function to check that a server (or list of servers) have the
// expected number of subscriptions
func checkExpectedSubs(expected int, servers ...*server.Server) error {
var err string
maxTime := time.Now().Add(5 * time.Second)
for time.Now().Before(maxTime) {
err = ""
for _, s := range servers {
if numSubs := int(s.NumSubscriptions()); numSubs != expected {
err = fmt.Sprintf("Expected %d subscriptions for server %q, got %d", expected, s.Id(), numSubs)
break
}
}
if err != "" {
time.Sleep(100 * time.Millisecond)
} else {
break
}
}
if err != "" {
return errors.New(err)
}
return nil
}
func runServers(t *testing.T) (srvA, srvB *server.Server, optsA, optsB *server.Options) {
srvA, optsA = RunServerWithConfig("./configs/srv_a.conf")
srvB, optsB = RunServerWithConfig("./configs/srv_b.conf")
@@ -120,6 +146,11 @@ func TestClusterQueueSubs(t *testing.T) {
sendA("PING\r\n")
expectA(pongRe)
// Make sure the subs have propagated to srvB before continuing
if err := checkExpectedSubs(len(qg1Sids_a), srvB); err != nil {
t.Fatalf("%v", err)
}
sendB("PUB foo 2\r\nok\r\n")
sendB("PING\r\n")
expectB(pongRe)
@@ -142,6 +173,11 @@ func TestClusterQueueSubs(t *testing.T) {
sendA("PING\r\n")
expectA(pongRe)
// Make sure the subs have propagated to srvB before continuing
if err := checkExpectedSubs(len(qg1Sids_a)+len(pSids), srvB); err != nil {
t.Fatalf("%v", err)
}
// Send to B
sendB("PUB foo 2\r\nok\r\n")
sendB("PING\r\n")
@@ -168,6 +204,11 @@ func TestClusterQueueSubs(t *testing.T) {
sendB("PING\r\n")
expectB(pongRe)
// Make sure the subs have propagated to srvA before continuing
if err := checkExpectedSubs(len(qg1Sids_a)+len(pSids)+len(qg2Sids_b), srvA); err != nil {
t.Fatalf("%v", err)
}
// Send to B
sendB("PUB foo 2\r\nok\r\n")
@@ -187,6 +228,11 @@ func TestClusterQueueSubs(t *testing.T) {
sendA("PING\r\n")
expectA(pongRe)
// Make sure the subs have propagated to srvB before continuing
if err := checkExpectedSubs(len(pSids)+len(qg2Sids_b), srvB); err != nil {
t.Fatalf("%v", err)
}
// Send to B
sendB("PUB foo 2\r\nok\r\n")
@@ -215,8 +261,6 @@ func TestClusterDoubleMsgs(t *testing.T) {
defer srvA.Shutdown()
defer srvB.Shutdown()
time.Sleep(50 * time.Millisecond)
clientA1 := createClientConn(t, optsA.Host, optsA.Port)
defer clientA1.Close()
@@ -243,6 +287,11 @@ func TestClusterDoubleMsgs(t *testing.T) {
sendA1("PING\r\n")
expectA1(pongRe)
// Make sure the subs have propagated to srvB before continuing
if err := checkExpectedSubs(len(qg1Sids_a), srvB); err != nil {
t.Fatalf("%v", err)
}
sendB("PUB foo 2\r\nok\r\n")
sendB("PING\r\n")
expectB(pongRe)
@@ -259,6 +308,11 @@ func TestClusterDoubleMsgs(t *testing.T) {
expectA2(pongRe)
pSids := []string{"1", "2"}
// Make sure the subs have propagated to srvB before continuing
if err := checkExpectedSubs(len(qg1Sids_a)+2, srvB); err != nil {
t.Fatalf("%v", err)
}
sendB("PUB foo 2\r\nok\r\n")
sendB("PING\r\n")
expectB(pongRe)

View File

@@ -5,8 +5,10 @@ package test
import (
"fmt"
"net"
"runtime"
"strings"
"testing"
"time"
"github.com/nats-io/nats"
)
@@ -34,6 +36,7 @@ func TestMaxPayload(t *testing.T) {
if err != nil {
t.Fatalf("Could not make a raw connection to the server: %v", err)
}
defer conn.Close()
info := make([]byte, 512)
_, err = conn.Read(info)
if err != nil {
@@ -65,7 +68,21 @@ func TestMaxPayload(t *testing.T) {
// publishing the bytes following what is suggested by server
// in the info message has its connection closed.
_, err = conn.Write(big)
if err == nil {
if err == nil && runtime.GOOS != "windows" {
t.Errorf("Expected error due to maximum payload transgression.")
}
// On windows, the previous write will not fail because the connection
// is not fully closed at this stage.
if runtime.GOOS == "windows" {
// Issuing a PING and not expecting the PONG.
_, err = conn.Write([]byte("PING\r\n"))
if err == nil {
conn.SetReadDeadline(time.Now().Add(500 * time.Millisecond))
_, err = conn.Read(big)
if err == nil {
t.Errorf("Expected closed connection due to maximum payload transgression.")
}
}
}
}

View File

@@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"net"
"runtime"
"strings"
"testing"
@@ -14,6 +15,43 @@ import (
"github.com/nats-io/gnatsd/server"
)
func shutdownServerAndWait(t *testing.T, s *server.Server) bool {
listenSpec := s.GetListenEndpoint()
routeListenSpec := s.GetRouteListenEndpoint()
s.Shutdown()
// For now, do this only on Windows. Lots of tests would fail
// without this because the listen port would linger from one
// test to another causing failures.
checkShutdown := func(listen string) bool {
down := false
maxTime := time.Now().Add(5 * time.Second)
for time.Now().Before(maxTime) {
conn, err := net.Dial("tcp", listen)
if err != nil {
down = true
break
}
conn.Close()
// Retry after 50ms
time.Sleep(50 * time.Millisecond)
}
return down
}
if listenSpec != "" {
if !checkShutdown(listenSpec) {
return false
}
}
if routeListenSpec != "" {
if !checkShutdown(routeListenSpec) {
return false
}
}
return true
}
func runRouteServer(t *testing.T) (*server.Server, *server.Options) {
return RunServerWithConfig("./configs/cluster.conf")
}
@@ -146,6 +184,12 @@ func TestSendRouteSubAndUnsub(t *testing.T) {
if rsid2 != rsid {
t.Fatalf("Expected rsid's to match. %q vs %q\n", rsid, rsid2)
}
// Explicitly shutdown the server, otherwise this test would
// cause following test to fail.
if down := shutdownServerAndWait(t, s); !down {
t.Fatal("Unable to verify server was shutdown")
}
}
func TestSendRouteSolicit(t *testing.T) {

View File

@@ -151,7 +151,7 @@ func stressConnect(t *testing.T, wg *sync.WaitGroup, errCh chan error, url strin
subName := fmt.Sprintf("foo.%d", index)
for i := 0; i < 100; i++ {
for i := 0; i < 33; i++ {
nc, err := nats.Connect(url, nats.RootCAs("./configs/certs/ca.pem"))
if err != nil {
errCh <- fmt.Errorf("Unable to create TLS connection: %v\n", err)