mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Merge pull request #222 from nats-io/fixes_for_windows
Fixed code and tests to run on Windows
This commit is contained in:
@@ -8,6 +8,7 @@ import (
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"runtime"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -382,6 +383,12 @@ func TestConnzLastActivity(t *testing.T) {
|
||||
t.Fatalf("Expected LastActivity to be valid\n")
|
||||
}
|
||||
|
||||
// On Windows, looks like the precision is too low, and if we
|
||||
// don't wait, first and last would be equal.
|
||||
if runtime.GOOS == "windows" {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
|
||||
// Sub should trigger update.
|
||||
nc.Subscribe("hello.world", func(m *nats.Msg) {})
|
||||
nc.Flush()
|
||||
@@ -390,6 +397,13 @@ func TestConnzLastActivity(t *testing.T) {
|
||||
if firstLast.Equal(subLast) {
|
||||
t.Fatalf("Subscribe should have triggered update to LastActivity\n")
|
||||
}
|
||||
|
||||
// On Windows, looks like the precision is too low, and if we
|
||||
// don't wait, first and last would be equal.
|
||||
if runtime.GOOS == "windows" {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
|
||||
// Pub should trigger as well
|
||||
nc.Publish("foo", []byte("Hello"))
|
||||
nc.Flush()
|
||||
@@ -398,6 +412,13 @@ func TestConnzLastActivity(t *testing.T) {
|
||||
if subLast.Equal(pubLast) {
|
||||
t.Fatalf("Publish should have triggered update to LastActivity\n")
|
||||
}
|
||||
|
||||
// On Windows, looks like the precision is too low, and if we
|
||||
// don't wait, first and last would be equal.
|
||||
if runtime.GOOS == "windows" {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
|
||||
// Message delivery should trigger as well
|
||||
nc2.Publish("foo", []byte("Hello"))
|
||||
nc2.Flush()
|
||||
|
||||
@@ -6,10 +6,14 @@ import (
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"runtime"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestPSEmulation(t *testing.T) {
|
||||
if runtime.GOOS == "windows" {
|
||||
t.Skipf("Skipping this test on Windows")
|
||||
}
|
||||
var rss, vss, psRss, psVss int64
|
||||
var pcpu, psPcpu float64
|
||||
|
||||
|
||||
@@ -429,6 +429,10 @@ func (s *Server) addRoute(c *client, info *Info) (bool, bool) {
|
||||
sendInfo := false
|
||||
|
||||
s.mu.Lock()
|
||||
if !s.running {
|
||||
s.mu.Unlock()
|
||||
return false, false
|
||||
}
|
||||
remote, exists := s.remotes[id]
|
||||
if !exists {
|
||||
s.routes[c.cid] = c
|
||||
|
||||
@@ -711,8 +711,8 @@ func (s *Server) Addr() net.Addr {
|
||||
}
|
||||
|
||||
// GetListenEndpoint will return a string of the form host:port suitable for
|
||||
// a connect. Will return nil if the server is not ready to accept client
|
||||
// connections.
|
||||
// a connect. Will return empty string if the server is not ready to accept
|
||||
// client connections.
|
||||
func (s *Server) GetListenEndpoint() string {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
@@ -734,6 +734,29 @@ func (s *Server) GetListenEndpoint() string {
|
||||
return net.JoinHostPort(host, strconv.Itoa(s.opts.Port))
|
||||
}
|
||||
|
||||
// GetRouteListenEndpoint will return a string of the form host:port suitable
|
||||
// for a connect. Will return empty string if the server is not configured for
|
||||
// routing or not ready to accept route connections.
|
||||
func (s *Server) GetRouteListenEndpoint() string {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if s.routeListener == nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
host := s.opts.ClusterHost
|
||||
|
||||
// On windows, a connect with host "0.0.0.0" (or "::") will fail.
|
||||
// We replace it with "localhost" when that's the case.
|
||||
if host == "0.0.0.0" || host == "::" || host == "[::]" {
|
||||
host = "localhost"
|
||||
}
|
||||
|
||||
// Return the cluster's Host and Port.
|
||||
return net.JoinHostPort(host, strconv.Itoa(s.opts.ClusterPort))
|
||||
}
|
||||
|
||||
// Server's ID
|
||||
func (s *Server) Id() string {
|
||||
s.mu.Lock()
|
||||
|
||||
@@ -39,6 +39,8 @@ func TestServerRestartReSliceIssue(t *testing.T) {
|
||||
reconnectsDone <- true
|
||||
}
|
||||
|
||||
clients := make([]*nats.Conn, numClients)
|
||||
|
||||
// Create 20 random clients.
|
||||
// Half connected to A and half to B..
|
||||
for i := 0; i < numClients; i++ {
|
||||
@@ -47,6 +49,7 @@ func TestServerRestartReSliceIssue(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create connection: %v\n", err)
|
||||
}
|
||||
clients[i] = nc
|
||||
defer nc.Close()
|
||||
|
||||
// Create 10 subscriptions each..
|
||||
@@ -79,11 +82,26 @@ func TestServerRestartReSliceIssue(t *testing.T) {
|
||||
srvB = RunServer(optsB)
|
||||
defer srvB.Shutdown()
|
||||
|
||||
select {
|
||||
case <-reconnectsDone:
|
||||
break
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatalf("Expected %d reconnects, got %d\n", numClients/2, reconnects)
|
||||
// Check that all expected clients have reconnected
|
||||
for i := 0; i < numClients/2; i++ {
|
||||
select {
|
||||
case <-reconnectsDone:
|
||||
break
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatalf("Expected %d reconnects, got %d\n", numClients/2, reconnects)
|
||||
}
|
||||
}
|
||||
|
||||
// Since srvB was restarted, its defer Shutdown() was last, so will
|
||||
// exectue first, which would cause clients that have reconnected to
|
||||
// it to try to reconnect (causing delays on Windows). So let's
|
||||
// explicitly close them here.
|
||||
// NOTE: With fix of NATS GO client (reconnect loop yields to Close()),
|
||||
// this change would not be required, however, it still speeeds up
|
||||
// the test, from more than 7s to less than one.
|
||||
for i := 0; i < numClients; i++ {
|
||||
nc := clients[i]
|
||||
nc.Close()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -211,6 +229,11 @@ func TestServerRestartAndQueueSubs(t *testing.T) {
|
||||
// Base Test
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
// Make sure subscriptions are propagated in the cluster
|
||||
if err := checkExpectedSubs(4, srvA, srvB); err != nil {
|
||||
t.Fatalf("%v", err)
|
||||
}
|
||||
|
||||
// Now send 10 messages, from each client..
|
||||
sendAndCheckMsgs(10)
|
||||
|
||||
@@ -228,10 +251,23 @@ func TestServerRestartAndQueueSubs(t *testing.T) {
|
||||
|
||||
waitOnReconnect()
|
||||
|
||||
// Make sure the cluster is reformed
|
||||
checkClusterFormed(t, srvA, srvB)
|
||||
|
||||
// Make sure subscriptions are propagated in the cluster
|
||||
if err := checkExpectedSubs(4, srvA, srvB); err != nil {
|
||||
t.Fatalf("%v", err)
|
||||
}
|
||||
|
||||
// Now send another 10 messages, from each client..
|
||||
sendAndCheckMsgs(10)
|
||||
|
||||
// Since servers are restarted after all client's close defer calls,
|
||||
// their defer Shutdown() are last, and so will be executed first,
|
||||
// which would cause clients to try to reconnect on exit, causing
|
||||
// delays on Windows. So let's explicitly close them here.
|
||||
c1.Close()
|
||||
c2.Close()
|
||||
}
|
||||
|
||||
// This will test request semantics across a route
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
package test
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"runtime"
|
||||
"testing"
|
||||
@@ -36,6 +37,31 @@ func checkClusterFormed(t *testing.T, servers ...*server.Server) {
|
||||
}
|
||||
}
|
||||
|
||||
// Helper function to check that a server (or list of servers) have the
|
||||
// expected number of subscriptions
|
||||
func checkExpectedSubs(expected int, servers ...*server.Server) error {
|
||||
var err string
|
||||
maxTime := time.Now().Add(5 * time.Second)
|
||||
for time.Now().Before(maxTime) {
|
||||
err = ""
|
||||
for _, s := range servers {
|
||||
if numSubs := int(s.NumSubscriptions()); numSubs != expected {
|
||||
err = fmt.Sprintf("Expected %d subscriptions for server %q, got %d", expected, s.Id(), numSubs)
|
||||
break
|
||||
}
|
||||
}
|
||||
if err != "" {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
if err != "" {
|
||||
return errors.New(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func runServers(t *testing.T) (srvA, srvB *server.Server, optsA, optsB *server.Options) {
|
||||
srvA, optsA = RunServerWithConfig("./configs/srv_a.conf")
|
||||
srvB, optsB = RunServerWithConfig("./configs/srv_b.conf")
|
||||
@@ -120,6 +146,11 @@ func TestClusterQueueSubs(t *testing.T) {
|
||||
sendA("PING\r\n")
|
||||
expectA(pongRe)
|
||||
|
||||
// Make sure the subs have propagated to srvB before continuing
|
||||
if err := checkExpectedSubs(len(qg1Sids_a), srvB); err != nil {
|
||||
t.Fatalf("%v", err)
|
||||
}
|
||||
|
||||
sendB("PUB foo 2\r\nok\r\n")
|
||||
sendB("PING\r\n")
|
||||
expectB(pongRe)
|
||||
@@ -142,6 +173,11 @@ func TestClusterQueueSubs(t *testing.T) {
|
||||
sendA("PING\r\n")
|
||||
expectA(pongRe)
|
||||
|
||||
// Make sure the subs have propagated to srvB before continuing
|
||||
if err := checkExpectedSubs(len(qg1Sids_a)+len(pSids), srvB); err != nil {
|
||||
t.Fatalf("%v", err)
|
||||
}
|
||||
|
||||
// Send to B
|
||||
sendB("PUB foo 2\r\nok\r\n")
|
||||
sendB("PING\r\n")
|
||||
@@ -168,6 +204,11 @@ func TestClusterQueueSubs(t *testing.T) {
|
||||
sendB("PING\r\n")
|
||||
expectB(pongRe)
|
||||
|
||||
// Make sure the subs have propagated to srvA before continuing
|
||||
if err := checkExpectedSubs(len(qg1Sids_a)+len(pSids)+len(qg2Sids_b), srvA); err != nil {
|
||||
t.Fatalf("%v", err)
|
||||
}
|
||||
|
||||
// Send to B
|
||||
sendB("PUB foo 2\r\nok\r\n")
|
||||
|
||||
@@ -187,6 +228,11 @@ func TestClusterQueueSubs(t *testing.T) {
|
||||
sendA("PING\r\n")
|
||||
expectA(pongRe)
|
||||
|
||||
// Make sure the subs have propagated to srvB before continuing
|
||||
if err := checkExpectedSubs(len(pSids)+len(qg2Sids_b), srvB); err != nil {
|
||||
t.Fatalf("%v", err)
|
||||
}
|
||||
|
||||
// Send to B
|
||||
sendB("PUB foo 2\r\nok\r\n")
|
||||
|
||||
@@ -215,8 +261,6 @@ func TestClusterDoubleMsgs(t *testing.T) {
|
||||
defer srvA.Shutdown()
|
||||
defer srvB.Shutdown()
|
||||
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
clientA1 := createClientConn(t, optsA.Host, optsA.Port)
|
||||
defer clientA1.Close()
|
||||
|
||||
@@ -243,6 +287,11 @@ func TestClusterDoubleMsgs(t *testing.T) {
|
||||
sendA1("PING\r\n")
|
||||
expectA1(pongRe)
|
||||
|
||||
// Make sure the subs have propagated to srvB before continuing
|
||||
if err := checkExpectedSubs(len(qg1Sids_a), srvB); err != nil {
|
||||
t.Fatalf("%v", err)
|
||||
}
|
||||
|
||||
sendB("PUB foo 2\r\nok\r\n")
|
||||
sendB("PING\r\n")
|
||||
expectB(pongRe)
|
||||
@@ -259,6 +308,11 @@ func TestClusterDoubleMsgs(t *testing.T) {
|
||||
expectA2(pongRe)
|
||||
pSids := []string{"1", "2"}
|
||||
|
||||
// Make sure the subs have propagated to srvB before continuing
|
||||
if err := checkExpectedSubs(len(qg1Sids_a)+2, srvB); err != nil {
|
||||
t.Fatalf("%v", err)
|
||||
}
|
||||
|
||||
sendB("PUB foo 2\r\nok\r\n")
|
||||
sendB("PING\r\n")
|
||||
expectB(pongRe)
|
||||
|
||||
@@ -5,8 +5,10 @@ package test
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"runtime"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/nats"
|
||||
)
|
||||
@@ -34,6 +36,7 @@ func TestMaxPayload(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("Could not make a raw connection to the server: %v", err)
|
||||
}
|
||||
defer conn.Close()
|
||||
info := make([]byte, 512)
|
||||
_, err = conn.Read(info)
|
||||
if err != nil {
|
||||
@@ -65,7 +68,21 @@ func TestMaxPayload(t *testing.T) {
|
||||
// publishing the bytes following what is suggested by server
|
||||
// in the info message has its connection closed.
|
||||
_, err = conn.Write(big)
|
||||
if err == nil {
|
||||
if err == nil && runtime.GOOS != "windows" {
|
||||
t.Errorf("Expected error due to maximum payload transgression.")
|
||||
}
|
||||
|
||||
// On windows, the previous write will not fail because the connection
|
||||
// is not fully closed at this stage.
|
||||
if runtime.GOOS == "windows" {
|
||||
// Issuing a PING and not expecting the PONG.
|
||||
_, err = conn.Write([]byte("PING\r\n"))
|
||||
if err == nil {
|
||||
conn.SetReadDeadline(time.Now().Add(500 * time.Millisecond))
|
||||
_, err = conn.Read(big)
|
||||
if err == nil {
|
||||
t.Errorf("Expected closed connection due to maximum payload transgression.")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"runtime"
|
||||
"strings"
|
||||
"testing"
|
||||
@@ -14,6 +15,43 @@ import (
|
||||
"github.com/nats-io/gnatsd/server"
|
||||
)
|
||||
|
||||
func shutdownServerAndWait(t *testing.T, s *server.Server) bool {
|
||||
listenSpec := s.GetListenEndpoint()
|
||||
routeListenSpec := s.GetRouteListenEndpoint()
|
||||
|
||||
s.Shutdown()
|
||||
|
||||
// For now, do this only on Windows. Lots of tests would fail
|
||||
// without this because the listen port would linger from one
|
||||
// test to another causing failures.
|
||||
checkShutdown := func(listen string) bool {
|
||||
down := false
|
||||
maxTime := time.Now().Add(5 * time.Second)
|
||||
for time.Now().Before(maxTime) {
|
||||
conn, err := net.Dial("tcp", listen)
|
||||
if err != nil {
|
||||
down = true
|
||||
break
|
||||
}
|
||||
conn.Close()
|
||||
// Retry after 50ms
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
}
|
||||
return down
|
||||
}
|
||||
if listenSpec != "" {
|
||||
if !checkShutdown(listenSpec) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
if routeListenSpec != "" {
|
||||
if !checkShutdown(routeListenSpec) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func runRouteServer(t *testing.T) (*server.Server, *server.Options) {
|
||||
return RunServerWithConfig("./configs/cluster.conf")
|
||||
}
|
||||
@@ -146,6 +184,12 @@ func TestSendRouteSubAndUnsub(t *testing.T) {
|
||||
if rsid2 != rsid {
|
||||
t.Fatalf("Expected rsid's to match. %q vs %q\n", rsid, rsid2)
|
||||
}
|
||||
|
||||
// Explicitly shutdown the server, otherwise this test would
|
||||
// cause following test to fail.
|
||||
if down := shutdownServerAndWait(t, s); !down {
|
||||
t.Fatal("Unable to verify server was shutdown")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSendRouteSolicit(t *testing.T) {
|
||||
|
||||
@@ -151,7 +151,7 @@ func stressConnect(t *testing.T, wg *sync.WaitGroup, errCh chan error, url strin
|
||||
|
||||
subName := fmt.Sprintf("foo.%d", index)
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
for i := 0; i < 33; i++ {
|
||||
nc, err := nats.Connect(url, nats.RootCAs("./configs/certs/ca.pem"))
|
||||
if err != nil {
|
||||
errCh <- fmt.Errorf("Unable to create TLS connection: %v\n", err)
|
||||
|
||||
Reference in New Issue
Block a user