diff --git a/conf/parse.go b/conf/parse.go index 32767c48..1552b9c4 100644 --- a/conf/parse.go +++ b/conf/parse.go @@ -140,7 +140,7 @@ func (p *parser) processItem(it item) error { case itemInteger: lastDigit := 0 for _, r := range it.val { - if !unicode.IsDigit(r) { + if !unicode.IsDigit(r) && r != '-' { break } lastDigit++ diff --git a/server/client_test.go b/server/client_test.go index 1009a1c4..6aa13c3f 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -634,20 +634,23 @@ func TestTwoTokenPubMatchSingleTokenSub(t *testing.T) { } func TestUnsubRace(t *testing.T) { - s := RunServer(nil) + opts := DefaultOptions() + s := RunServer(opts) defer s.Shutdown() - nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", - DefaultOptions.Host, - DefaultOptions.Port)) + url := fmt.Sprintf("nats://%s:%d", + s.getOpts().Host, + s.Addr().(*net.TCPAddr).Port, + ) + nc, err := nats.Connect(url) if err != nil { - t.Fatalf("Error creating client: %v\n", err) + t.Fatalf("Error creating client to %s: %v\n", url, err) } defer nc.Close() ncp, err := nats.Connect(fmt.Sprintf("nats://%s:%d", - DefaultOptions.Host, - DefaultOptions.Port)) + s.getOpts().Host, + s.Addr().(*net.TCPAddr).Port)) if err != nil { t.Fatalf("Error creating client: %v\n", err) } diff --git a/server/configs/reload/basic.conf b/server/configs/reload/basic.conf new file mode 100644 index 00000000..a12cf8e0 --- /dev/null +++ b/server/configs/reload/basic.conf @@ -0,0 +1 @@ +listen: localhost:-1 diff --git a/server/configs/reload/tls_test.conf b/server/configs/reload/tls_test.conf new file mode 100644 index 00000000..08e0da82 --- /dev/null +++ b/server/configs/reload/tls_test.conf @@ -0,0 +1,10 @@ + +# Simple TLS config file + +listen: localhost:-1 + +tls { + cert_file: "./configs/certs/server.pem" + key_file: "./configs/certs/key.pem" + timeout: 2 +} diff --git a/server/configs/reload/tls_verify_test.conf b/server/configs/reload/tls_verify_test.conf new file mode 100644 index 00000000..76de4d0a --- /dev/null +++ b/server/configs/reload/tls_verify_test.conf @@ -0,0 +1,12 @@ + +# Simple TLS config file + +listen: localhost:-1 + +tls { + cert_file: "./configs/certs/cert.new.pem" + key_file: "./configs/certs/key.new.pem" + ca_file: "./configs/certs/cert.new.pem" + verify: true + timeout: 2 +} diff --git a/server/log.go b/server/log.go index 59d4bfc7..2cb1e910 100644 --- a/server/log.go +++ b/server/log.go @@ -7,6 +7,7 @@ import ( "sync/atomic" "github.com/nats-io/gnatsd/logger" + "fmt" ) // Logger interface of the NATS Server @@ -118,6 +119,11 @@ func (s *Server) Errorf(format string, v ...interface{}) { // Fatalf logs a fatal error func (s *Server) Fatalf(format string, v ...interface{}) { + s.mu.Lock() + // always store the fatal log + errStr := fmt.Sprintf(format, v) + s.fatalError = errStr + s.mu.Unlock() s.executeLogCall(func(logger Logger, format string, v ...interface{}) { logger.Fatalf(format, v...) }, format, v...) diff --git a/server/monitor_test.go b/server/monitor_test.go index 89ca91bd..19d5dcea 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -15,36 +15,37 @@ import ( "unicode" "github.com/nats-io/go-nats" + "net" ) -const CLIENT_PORT = 11224 -const MONITOR_PORT = 11424 -const CLUSTER_PORT = 12444 +const CLIENT_PORT = -1 +const MONITOR_PORT = -1 +const CLUSTER_PORT = -1 -var DefaultMonitorOptions = Options{ - Host: "localhost", - Port: CLIENT_PORT, - HTTPHost: "127.0.0.1", - HTTPPort: MONITOR_PORT, - Cluster: ClusterOpts{ - Host: "localhost", - Port: CLUSTER_PORT, - }, - NoLog: true, - NoSigs: true, +func DefaultMonitorOptions() *Options { + return &Options { + Host: "localhost", + Port: CLIENT_PORT, + HTTPHost: "127.0.0.1", + HTTPPort: MONITOR_PORT, + NoLog: true, + NoSigs: true, + Debug: true, + Trace: true, + } } func runMonitorServer() *Server { resetPreviousHTTPConnections() - opts := DefaultMonitorOptions - return RunServer(&opts) + opts := DefaultMonitorOptions() + return RunServer(opts) } func runMonitorServerNoHTTPPort() *Server { resetPreviousHTTPConnections() - opts := DefaultMonitorOptions + opts := DefaultMonitorOptions() opts.HTTPPort = 0 - return RunServer(&opts) + return RunServer(opts) } func resetPreviousHTTPConnections() { @@ -88,7 +89,8 @@ func TestNoMonitorPort(t *testing.T) { s := runMonitorServerNoHTTPPort() defer s.Shutdown() - url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT) + // this test might be meaningless now that we're testing with random ports? + url := fmt.Sprintf("http://localhost:%d/", 11245) if resp, err := http.Get(url + "varz"); err == nil { t.Fatalf("Expected error: Got %+v\n", resp) } @@ -104,7 +106,7 @@ func TestVarz(t *testing.T) { s := runMonitorServer() defer s.Shutdown() - url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT) + url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port) resp, err := http.Get(url + "varz") if err != nil { t.Fatalf("Expected no error: Got %v\n", err) @@ -132,7 +134,7 @@ func TestVarz(t *testing.T) { t.Fatal("Expected start time to be within 10 seconds.") } - nc := createClientConnSubscribeAndPublish(t) + nc := createClientConnSubscribeAndPublish(t, s) defer nc.Close() resp, err = http.Get(url + "varz") @@ -176,7 +178,7 @@ func TestVarz(t *testing.T) { } // Test JSONP - respj, errj := http.Get(fmt.Sprintf("http://localhost:%d/", MONITOR_PORT) + "varz?callback=callback") + respj, errj := http.Get(fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port) + "varz?callback=callback") if errj != nil { t.Fatalf("Expected no error: Got %v\n", err) } @@ -191,7 +193,7 @@ func TestConnz(t *testing.T) { s := runMonitorServer() defer s.Shutdown() - url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT) + url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port) resp, err := http.Get(url + "connz") if err != nil { t.Fatalf("Expected no error: Got %v\n", err) @@ -226,7 +228,7 @@ func TestConnz(t *testing.T) { } // Test with connections. - nc := createClientConnSubscribeAndPublish(t) + nc := createClientConnSubscribeAndPublish(t, s) defer nc.Close() resp, err = http.Get(url + "connz") @@ -311,7 +313,7 @@ func TestConnz(t *testing.T) { } // Test JSONP - respj, errj := http.Get(fmt.Sprintf("http://localhost:%d/", MONITOR_PORT) + "connz?callback=callback") + respj, errj := http.Get(fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port) + "connz?callback=callback") if errj != nil { t.Fatalf("Expected no error: Got %v\n", err) } @@ -326,10 +328,10 @@ func TestConnzWithSubs(t *testing.T) { s := runMonitorServer() defer s.Shutdown() - nc := createClientConnSubscribeAndPublish(t) + nc := createClientConnSubscribeAndPublish(t, s) defer nc.Close() - url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT) + url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port) resp, err := http.Get(url + "connz?subs=1") if err != nil { t.Fatalf("Expected no error: Got %v\n", err) @@ -359,16 +361,16 @@ func TestConnzLastActivity(t *testing.T) { s := runMonitorServer() defer s.Shutdown() - nc := createClientConnSubscribeAndPublish(t) + nc := createClientConnSubscribeAndPublish(t, s) defer nc.Close() nc.Flush() - nc2 := createClientConnSubscribeAndPublish(t) + nc2 := createClientConnSubscribeAndPublish(t, s) defer nc2.Close() nc2.Flush() pollConz := func() *Connz { - url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT) + url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port) resp, err := http.Get(url + "connz?subs=1") if err != nil { t.Fatalf("Expected no error: Got %v\n", err) @@ -456,7 +458,7 @@ func TestConnzWithOffsetAndLimit(t *testing.T) { s := runMonitorServer() defer s.Shutdown() - url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT) + url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port) // Test that offset and limit ok when not enough connections resp, err := http.Get(url + "connz?offset=1&limit=1") @@ -505,10 +507,10 @@ func TestConnzWithOffsetAndLimit(t *testing.T) { DefaultConnListSize, c.Offset, c.Limit) } - cl1 := createClientConnSubscribeAndPublish(t) + cl1 := createClientConnSubscribeAndPublish(t, s) defer cl1.Close() - cl2 := createClientConnSubscribeAndPublish(t) + cl2 := createClientConnSubscribeAndPublish(t, s) defer cl2.Close() resp, err = http.Get(url + "connz?offset=1&limit=1") @@ -590,15 +592,16 @@ func TestConnzWithOffsetAndLimit(t *testing.T) { func TestConnzDefaultSorted(t *testing.T) { s := runMonitorServer() + s.ConfigureLogger() defer s.Shutdown() clients := make([]*nats.Conn, 4) for i := range clients { - clients[i] = createClientConnSubscribeAndPublish(t) + clients[i] = createClientConnSubscribeAndPublish(t, s) defer clients[i].Close() } - url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT) + url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port) resp, err := http.Get(url + "connz") if err != nil { t.Fatalf("Expected no error: Got %v\n", err) @@ -630,11 +633,11 @@ func TestConnzSortedByCid(t *testing.T) { clients := make([]*nats.Conn, 4) for i := range clients { - clients[i] = createClientConnSubscribeAndPublish(t) + clients[i] = createClientConnSubscribeAndPublish(t, s) defer clients[i].Close() } - url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT) + url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port) resp, err := http.Get(url + "connz?sort=cid") if err != nil { t.Fatalf("Expected no error: Got %v\n", err) @@ -665,7 +668,7 @@ func TestConnzSortedByBytesAndMsgs(t *testing.T) { defer s.Shutdown() // Create a connection and make it send more messages than others - firstClient := createClientConnSubscribeAndPublish(t) + firstClient := createClientConnSubscribeAndPublish(t, s) for i := 0; i < 100; i++ { firstClient.Publish("foo", []byte("Hello World")) } @@ -674,11 +677,11 @@ func TestConnzSortedByBytesAndMsgs(t *testing.T) { clients := make([]*nats.Conn, 3) for i := range clients { - clients[i] = createClientConnSubscribeAndPublish(t) + clients[i] = createClientConnSubscribeAndPublish(t, s) defer clients[i].Close() } - url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT) + url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port) resp, err := http.Get(url + "connz?sort=bytes_to") if err != nil { t.Fatalf("Expected no error: Got %v\n", err) @@ -704,7 +707,7 @@ func TestConnzSortedByBytesAndMsgs(t *testing.T) { c.Conns[0].OutBytes, c.Conns[1].OutBytes, c.Conns[2].OutBytes, c.Conns[3].OutBytes) } - url = fmt.Sprintf("http://localhost:%d/", MONITOR_PORT) + url = fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port) resp, err = http.Get(url + "connz?sort=msgs_to") if err != nil { t.Fatalf("Expected no error: Got %v\n", err) @@ -730,7 +733,7 @@ func TestConnzSortedByBytesAndMsgs(t *testing.T) { c.Conns[0].OutMsgs, c.Conns[1].OutMsgs, c.Conns[2].OutMsgs, c.Conns[3].OutMsgs) } - url = fmt.Sprintf("http://localhost:%d/", MONITOR_PORT) + url = fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port) resp, err = http.Get(url + "connz?sort=bytes_from") if err != nil { t.Fatalf("Expected no error: Got %v\n", err) @@ -756,7 +759,7 @@ func TestConnzSortedByBytesAndMsgs(t *testing.T) { c.Conns[0].InBytes, c.Conns[1].InBytes, c.Conns[2].InBytes, c.Conns[3].InBytes) } - url = fmt.Sprintf("http://localhost:%d/", MONITOR_PORT) + url = fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port) resp, err = http.Get(url + "connz?sort=msgs_from") if err != nil { t.Fatalf("Expected no error: Got %v\n", err) @@ -787,16 +790,16 @@ func TestConnzSortedByPending(t *testing.T) { s := runMonitorServer() defer s.Shutdown() - firstClient := createClientConnSubscribeAndPublish(t) + firstClient := createClientConnSubscribeAndPublish(t, s) firstClient.Subscribe("hello.world", func(m *nats.Msg) {}) clients := make([]*nats.Conn, 3) for i := range clients { - clients[i] = createClientConnSubscribeAndPublish(t) + clients[i] = createClientConnSubscribeAndPublish(t, s) defer clients[i].Close() } defer firstClient.Close() - url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT) + url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port) resp, err := http.Get(url + "connz?sort=pending") if err != nil { t.Fatalf("Expected no error: Got %v\n", err) @@ -827,16 +830,16 @@ func TestConnzSortedBySubs(t *testing.T) { s := runMonitorServer() defer s.Shutdown() - firstClient := createClientConnSubscribeAndPublish(t) + firstClient := createClientConnSubscribeAndPublish(t, s) firstClient.Subscribe("hello.world", func(m *nats.Msg) {}) clients := make([]*nats.Conn, 3) for i := range clients { - clients[i] = createClientConnSubscribeAndPublish(t) + clients[i] = createClientConnSubscribeAndPublish(t, s) defer clients[i].Close() } defer firstClient.Close() - url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT) + url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port) resp, err := http.Get(url + "connz?sort=subs") if err != nil { t.Fatalf("Expected no error: Got %v\n", err) @@ -864,22 +867,23 @@ func TestConnzSortedBySubs(t *testing.T) { } func TestConnzSortedByLast(t *testing.T) { - s := runMonitorServer() + opts := DefaultMonitorOptions() + s := RunServer(opts) defer s.Shutdown() - firstClient := createClientConnSubscribeAndPublish(t) + firstClient := createClientConnSubscribeAndPublish(t, s) defer firstClient.Close() firstClient.Subscribe("hello.world", func(m *nats.Msg) {}) firstClient.Flush() clients := make([]*nats.Conn, 3) for i := range clients { - clients[i] = createClientConnSubscribeAndPublish(t) + clients[i] = createClientConnSubscribeAndPublish(t, s) defer clients[i].Close() clients[i].Flush() } - url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT) + url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port) resp, err := http.Get(url + "connz?sort=last") if err != nil { t.Fatalf("Expected no error: Got %v\n", err) @@ -912,12 +916,12 @@ func TestConnzSortedByUptime(t *testing.T) { clients := make([]*nats.Conn, 5) for i := range clients { - clients[i] = createClientConnSubscribeAndPublish(t) + clients[i] = createClientConnSubscribeAndPublish(t, s) defer clients[i].Close() time.Sleep(250 * time.Millisecond) } - url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT) + url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port) resp, err := http.Get(url + "connz?sort=uptime") if err != nil { t.Fatalf("Expected no error: Got %v\n", err) @@ -949,12 +953,12 @@ func TestConnzSortedByIdle(t *testing.T) { s := runMonitorServer() defer s.Shutdown() - firstClient := createClientConnSubscribeAndPublish(t) + firstClient := createClientConnSubscribeAndPublish(t, s) defer firstClient.Close() firstClient.Subscribe("client.1", func(m *nats.Msg) {}) firstClient.Flush() - secondClient := createClientConnSubscribeAndPublish(t) + secondClient := createClientConnSubscribeAndPublish(t, s) defer secondClient.Close() secondClient.Subscribe("client.2", func(m *nats.Msg) {}) secondClient.Flush() @@ -963,7 +967,7 @@ func TestConnzSortedByIdle(t *testing.T) { time.Sleep(time.Second) firstClient.Publish("client.1", []byte("new message")) - url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT) + url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port) resp, err := http.Get(url + "connz?sort=idle") if err != nil { t.Fatalf("Expected no error: Got %v\n", err) @@ -1011,16 +1015,16 @@ func TestConnzSortBadRequest(t *testing.T) { s := runMonitorServer() defer s.Shutdown() - firstClient := createClientConnSubscribeAndPublish(t) + firstClient := createClientConnSubscribeAndPublish(t, s) firstClient.Subscribe("hello.world", func(m *nats.Msg) {}) clients := make([]*nats.Conn, 3) for i := range clients { - clients[i] = createClientConnSubscribeAndPublish(t) + clients[i] = createClientConnSubscribeAndPublish(t, s) defer clients[i].Close() } defer firstClient.Close() - url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT) + url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port) resp, err := http.Get(url + "connz?sort=foo") if err != nil { t.Fatalf("Expected no error: Got %v\n", err) @@ -1032,28 +1036,34 @@ func TestConnzSortBadRequest(t *testing.T) { } func TestConnzWithRoutes(t *testing.T) { - s := runMonitorServer() + + opts := DefaultMonitorOptions() + opts.Cluster.Host = "localhost" + opts.Cluster.Port = CLUSTER_PORT + + s := RunServer(opts) + defer s.Shutdown() - var opts = Options{ + opts = &Options{ Host: "localhost", - Port: CLIENT_PORT + 1, + Port: -1, Cluster: ClusterOpts{ Host: "localhost", - Port: CLUSTER_PORT + 1, + Port: -1, }, NoLog: true, NoSigs: true, } - routeURL, _ := url.Parse(fmt.Sprintf("nats-route://127.0.0.1:%d", CLUSTER_PORT)) + routeURL, _ := url.Parse(fmt.Sprintf("nats-route://127.0.0.1:%d", s.ClusterAddr().(*net.TCPAddr).Port)) opts.Routes = []*url.URL{routeURL} - sc := RunServer(&opts) + sc := RunServer(opts) defer sc.Shutdown() time.Sleep(time.Second) - url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT) + url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port) resp, err := http.Get(url + "connz") if err != nil { t.Fatalf("Expected no error: Got %v\n", err) @@ -1086,7 +1096,7 @@ func TestConnzWithRoutes(t *testing.T) { } // Now check routez - url = fmt.Sprintf("http://localhost:%d/", MONITOR_PORT) + url = fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port) resp, err = http.Get(url + "routez") if err != nil { t.Fatalf("Expected no error: Got %v\n", err) @@ -1120,7 +1130,7 @@ func TestConnzWithRoutes(t *testing.T) { } // Test JSONP - respj, errj := http.Get(fmt.Sprintf("http://localhost:%d/", MONITOR_PORT) + "routez?callback=callback") + respj, errj := http.Get(fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port) + "routez?callback=callback") if errj != nil { t.Fatalf("Expected no error: Got %v\n", err) } @@ -1135,10 +1145,10 @@ func TestSubsz(t *testing.T) { s := runMonitorServer() defer s.Shutdown() - nc := createClientConnSubscribeAndPublish(t) + nc := createClientConnSubscribeAndPublish(t, s) defer nc.Close() - url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT) + url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port) resp, err := http.Get(url + "subscriptionsz") if err != nil { t.Fatalf("Expected no error: Got %v\n", err) @@ -1171,7 +1181,7 @@ func TestSubsz(t *testing.T) { } // Test JSONP - respj, errj := http.Get(fmt.Sprintf("http://localhost:%d/", MONITOR_PORT) + "subscriptionsz?callback=callback") + respj, errj := http.Get(fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port) + "subscriptionsz?callback=callback") ct = respj.Header.Get("Content-Type") if errj != nil { t.Fatalf("Expected no error: Got %v\n", err) @@ -1187,10 +1197,10 @@ func TestHandleRoot(t *testing.T) { s := runMonitorServer() defer s.Shutdown() - nc := createClientConnSubscribeAndPublish(t) + nc := createClientConnSubscribeAndPublish(t, s) defer nc.Close() - resp, err := http.Get(fmt.Sprintf("http://localhost:%d/", MONITOR_PORT)) + resp, err := http.Get(fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port)) if err != nil { t.Fatalf("Expected no error: Got %v\n", err) } @@ -1220,10 +1230,10 @@ func TestConnzWithNamedClient(t *testing.T) { defer s.Shutdown() clientName := "test-client" - nc := createClientConnWithName(t, clientName) + nc := createClientConnWithName(t, clientName, s) defer nc.Close() - url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT) + url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port) resp, err := http.Get(url + "connz") if err != nil { t.Fatalf("Expected no error: Got %v\n", err) @@ -1260,10 +1270,13 @@ func TestConnzWithNamedClient(t *testing.T) { } // Create a connection to test ConnInfo -func createClientConnSubscribeAndPublish(t *testing.T) *nats.Conn { - nc, err := nats.Connect(fmt.Sprintf("nats://localhost:%d", CLIENT_PORT)) +func createClientConnSubscribeAndPublish(t *testing.T, s *Server) *nats.Conn { + natsUrl := fmt.Sprintf("nats://127.0.0.1:%d", s.Addr().(*net.TCPAddr).Port) + client := nats.DefaultOptions + client.Servers = []string{natsUrl} + nc, err := client.Connect() if err != nil { - t.Fatalf("Error creating client: %v\n", err) + t.Fatalf("Error creating client: %v to: %s\n", err, natsUrl) } ch := make(chan bool) @@ -1274,8 +1287,8 @@ func createClientConnSubscribeAndPublish(t *testing.T) *nats.Conn { return nc } -func createClientConnWithName(t *testing.T, name string) *nats.Conn { - natsURI := fmt.Sprintf("nats://localhost:%d", CLIENT_PORT) +func createClientConnWithName(t *testing.T, name string, s *Server) *nats.Conn { + natsURI := fmt.Sprintf("nats://localhost:%d", s.Addr().(*net.TCPAddr).Port) client := nats.DefaultOptions client.Servers = []string{natsURI} @@ -1292,7 +1305,7 @@ func TestStacksz(t *testing.T) { s := runMonitorServer() defer s.Shutdown() - url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT) + url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port) resp, err := http.Get(url + "stacksz") if err != nil { t.Fatalf("Expected no error: Got %v\n", err) @@ -1315,7 +1328,7 @@ func TestStacksz(t *testing.T) { t.Fatalf("Result does not seem to contain server's stacks:\n%v", str) } // Test JSONP - respj, errj := http.Get(fmt.Sprintf("http://localhost:%d/", MONITOR_PORT) + "subscriptionsz?callback=callback") + respj, errj := http.Get(fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port) + "subscriptionsz?callback=callback") ct = respj.Header.Get("Content-Type") if errj != nil { t.Fatalf("Expected no error: Got %v\n", err) @@ -1330,7 +1343,7 @@ func TestConcurrentMonitoring(t *testing.T) { s := runMonitorServer() defer s.Shutdown() - url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT) + url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port) // Get some endpoints. Make sure we have at least varz, // and the more the merrier. endpoints := []string{"varz", "varz", "varz", "connz", "connz", "subsz", "subsz", "routez", "routez"} diff --git a/server/reload.go b/server/reload.go index f65adee3..d112f801 100644 --- a/server/reload.go +++ b/server/reload.go @@ -123,9 +123,23 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) { case "tlstimeout": // TLSTimeout change is picked up when Options is swapped. continue + case "port": + // check to see if newValue == 0 and continue if so. + if newValue == 0 { + // ignore RANDOM_PORT + continue + } else { + return nil, fmt.Errorf("Config reload not supported for %s", field.Name) + } + case "http_port", "https_port": + // check to see if newValue == -1 (RANDOM_PORT for http/https monitoring port) + if newValue == -1 { + continue + } + fallthrough default: // Bail out if attempting to reload any unsupported options. - return nil, fmt.Errorf("Config reload not supported for %s", field.Name) + return nil, fmt.Errorf("Config reload not supported for %s: old=%v, new=%v", field.Name, oldValue, newValue) } } diff --git a/server/reload_test.go b/server/reload_test.go index d2a183c9..cbc9c836 100644 --- a/server/reload_test.go +++ b/server/reload_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/nats-io/go-nats" + "net" ) // Ensure Reload returns an error when attempting to reload a server that did @@ -225,7 +226,7 @@ func TestConfigReloadRotateTLS(t *testing.T) { } config := filepath.Join(dir, "tmp.conf") - if err := os.Symlink("./configs/tls_test.conf", config); err != nil { + if err := os.Symlink("./configs/reload/tls_test.conf", config); err != nil { t.Fatalf("Error creating symlink: %v", err) } defer os.Remove(config) @@ -239,7 +240,7 @@ func TestConfigReloadRotateTLS(t *testing.T) { defer server.Shutdown() // Ensure we can connect as a sanity check. - addr := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port) + addr := fmt.Sprintf("nats://%s:%d", opts.Host, server.Addr().(*net.TCPAddr).Port) nc, err := nats.Connect(addr, nats.Secure()) if err != nil { t.Fatalf("Error creating client: %v", err) @@ -255,7 +256,7 @@ func TestConfigReloadRotateTLS(t *testing.T) { if err := os.Remove(config); err != nil { t.Fatalf("Error deleting symlink: %v", err) } - if err := os.Symlink("./configs/tls_verify_test.conf", config); err != nil { + if err := os.Symlink("./configs/reload/tls_verify_test.conf", config); err != nil { t.Fatalf("Error creating symlink: %v", err) } if err := server.Reload(); err != nil { @@ -298,7 +299,7 @@ func TestConfigReloadEnableTLS(t *testing.T) { } config := filepath.Join(dir, "tmp.conf") - if err := os.Symlink("./configs/basic.conf", config); err != nil { + if err := os.Symlink("./configs/reload/basic.conf", config); err != nil { t.Fatalf("Error creating symlink: %v", err) } defer os.Remove(config) @@ -312,7 +313,7 @@ func TestConfigReloadEnableTLS(t *testing.T) { defer server.Shutdown() // Ensure we can connect as a sanity check. - addr := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port) + addr := fmt.Sprintf("nats://%s:%d", opts.Host, server.Addr().(*net.TCPAddr).Port) nc, err := nats.Connect(addr) if err != nil { t.Fatalf("Error creating client: %v", err) @@ -323,7 +324,7 @@ func TestConfigReloadEnableTLS(t *testing.T) { if err := os.Remove(config); err != nil { t.Fatalf("Error deleting symlink: %v", err) } - if err := os.Symlink("./configs/tls_test.conf", config); err != nil { + if err := os.Symlink("./configs/reload/tls_test.conf", config); err != nil { t.Fatalf("Error creating symlink: %v", err) } if err := server.Reload(); err != nil { @@ -354,7 +355,7 @@ func TestConfigReloadDisableTLS(t *testing.T) { } config := filepath.Join(dir, "tmp.conf") - if err := os.Symlink("./configs/tls_test.conf", config); err != nil { + if err := os.Symlink("./configs/reload/tls_test.conf", config); err != nil { t.Fatalf("Error creating symlink: %v", err) } defer os.Remove(config) @@ -368,7 +369,7 @@ func TestConfigReloadDisableTLS(t *testing.T) { defer server.Shutdown() // Ensure we can connect as a sanity check. - addr := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port) + addr := fmt.Sprintf("nats://%s:%d", opts.Host, server.Addr().(*net.TCPAddr).Port) nc, err := nats.Connect(addr, nats.Secure()) if err != nil { t.Fatalf("Error creating client: %v", err) @@ -379,7 +380,7 @@ func TestConfigReloadDisableTLS(t *testing.T) { if err := os.Remove(config); err != nil { t.Fatalf("Error deleting symlink: %v", err) } - if err := os.Symlink("./configs/basic.conf", config); err != nil { + if err := os.Symlink("./configs/reload/basic.conf", config); err != nil { t.Fatalf("Error creating symlink: %v", err) } if err := server.Reload(); err != nil { diff --git a/server/route.go b/server/route.go index b9267342..43109482 100644 --- a/server/route.go +++ b/server/route.go @@ -602,7 +602,19 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) { // Snapshot server options. opts := s.getOpts() - hp := net.JoinHostPort(opts.Cluster.Host, strconv.Itoa(opts.Cluster.Port)) + // Get all possible URLs (when server listens to 0.0.0.0). + // This is going to be sent to other Servers, so that they can let their + // clients know about us. + clientConnectURLs := s.getClientConnectURLs() + + // Snapshot server options. + port := opts.Cluster.Port + + if port == -1 { + port = 0 + } + + hp := net.JoinHostPort(opts.Cluster.Host, strconv.Itoa(port)) s.Noticef("Listening for route connections on %s", hp) l, e := net.Listen("tcp", hp) if e != nil { @@ -612,6 +624,28 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) { return } + // Check for TLSConfig + tlsReq := opts.Cluster.TLSConfig != nil + info := Info{ + ID: s.info.ID, + Version: s.info.Version, + Host: opts.Cluster.Host, + Port: l.Addr().(*net.TCPAddr).Port, + AuthRequired: false, + TLSRequired: tlsReq, + SSLRequired: tlsReq, + TLSVerify: tlsReq, + MaxPayload: s.info.MaxPayload, + ClientConnectURLs: clientConnectURLs, + } + // Check for Auth items + if opts.Cluster.Username != "" { + info.AuthRequired = true + } + s.routeInfo = info + b, _ := json.Marshal(info) + s.routeInfoJSON = []byte(fmt.Sprintf(InfoProto, b)) + // Setup state that can enable shutdown s.mu.Lock() s.routeListener = l @@ -657,36 +691,6 @@ func (s *Server) StartRouting(clientListenReady chan struct{}) { // the possible ephemeral port to be selected. <-clientListenReady - // Get all possible URLs (when server listens to 0.0.0.0). - // This is going to be sent to other Servers, so that they can let their - // clients know about us. - clientConnectURLs := s.getClientConnectURLs() - - // Snapshot server options. - opts := s.getOpts() - - // Check for TLSConfig - tlsReq := opts.Cluster.TLSConfig != nil - info := Info{ - ID: s.info.ID, - Version: s.info.Version, - Host: opts.Cluster.Host, - Port: opts.Cluster.Port, - AuthRequired: false, - TLSRequired: tlsReq, - SSLRequired: tlsReq, - TLSVerify: tlsReq, - MaxPayload: s.info.MaxPayload, - ClientConnectURLs: clientConnectURLs, - } - // Check for Auth items - if opts.Cluster.Username != "" { - info.AuthRequired = true - } - s.routeInfo = info - b, _ := json.Marshal(info) - s.routeInfoJSON = []byte(fmt.Sprintf(InfoProto, b)) - // Spin up the accept loop ch := make(chan struct{}) go s.routeAcceptLoop(ch) diff --git a/server/routes_test.go b/server/routes_test.go index d47c2cc3..588f6b7e 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -172,9 +172,9 @@ func checkClusterFormed(t *testing.T, servers ...*Server) { // Helper function to generate next opts to make sure no port conflicts etc. func nextServerOpts(opts *Options) *Options { nopts := *opts - nopts.Port++ - nopts.Cluster.Port++ - nopts.HTTPPort++ + nopts.Port = -1 + nopts.Cluster.Port = -1 + nopts.HTTPPort = -1 return &nopts } @@ -187,12 +187,13 @@ func TestSeedSolicitWorks(t *testing.T) { defer srvSeed.Shutdown() optsA := nextServerOpts(optsSeed) - optsA.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsSeed.Cluster.Host, optsSeed.Cluster.Port)) + optsA.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsSeed.Cluster.Host, + srvSeed.ClusterAddr().(*net.TCPAddr).Port)) srvA := RunServer(optsA) defer srvA.Shutdown() - urlA := fmt.Sprintf("nats://%s:%d/", optsA.Host, optsA.Port) + urlA := fmt.Sprintf("nats://%s:%d/", optsA.Host, srvA.ClusterAddr().(*net.TCPAddr).Port) nc1, err := nats.Connect(urlA) if err != nil { @@ -206,12 +207,13 @@ func TestSeedSolicitWorks(t *testing.T) { nc1.Flush() optsB := nextServerOpts(optsA) - optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsSeed.Cluster.Host, optsSeed.Cluster.Port)) + optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsSeed.Cluster.Host, + srvSeed.ClusterAddr().(*net.TCPAddr).Port)) srvB := RunServer(optsB) defer srvB.Shutdown() - urlB := fmt.Sprintf("nats://%s:%d/", optsB.Host, optsB.Port) + urlB := fmt.Sprintf("nats://%s:%d/", optsB.Host, srvB.ClusterAddr().(*net.TCPAddr).Port) nc2, err := nats.Connect(urlB) if err != nil { @@ -239,13 +241,15 @@ func TestTLSSeedSolicitWorks(t *testing.T) { srvSeed := RunServer(optsSeed) defer srvSeed.Shutdown() + seedRouteUrl := fmt.Sprintf("nats://%s:%d", optsSeed.Cluster.Host, + srvSeed.ClusterAddr().(*net.TCPAddr).Port) optsA := nextServerOpts(optsSeed) - optsA.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsSeed.Cluster.Host, optsSeed.Cluster.Port)) + optsA.Routes = RoutesFromStr(seedRouteUrl) srvA := RunServer(optsA) defer srvA.Shutdown() - urlA := fmt.Sprintf("nats://%s:%d/", optsA.Host, optsA.Port) + urlA := fmt.Sprintf("nats://%s:%d/", optsA.Host, srvA.Addr().(*net.TCPAddr).Port) nc1, err := nats.Connect(urlA) if err != nil { @@ -259,12 +263,12 @@ func TestTLSSeedSolicitWorks(t *testing.T) { nc1.Flush() optsB := nextServerOpts(optsA) - optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsSeed.Cluster.Host, optsSeed.Cluster.Port)) + optsB.Routes = RoutesFromStr(seedRouteUrl) srvB := RunServer(optsB) defer srvB.Shutdown() - urlB := fmt.Sprintf("nats://%s:%d/", optsB.Host, optsB.Port) + urlB := fmt.Sprintf("nats://%s:%d/", optsB.Host, srvB.Addr().(*net.TCPAddr).Port) nc2, err := nats.Connect(urlB) if err != nil { @@ -292,13 +296,15 @@ func TestChainedSolicitWorks(t *testing.T) { srvSeed := RunServer(optsSeed) defer srvSeed.Shutdown() + seedRouteUrl := fmt.Sprintf("nats://%s:%d", optsSeed.Cluster.Host, + srvSeed.ClusterAddr().(*net.TCPAddr).Port) optsA := nextServerOpts(optsSeed) - optsA.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsSeed.Cluster.Host, optsSeed.Cluster.Port)) + optsA.Routes = RoutesFromStr(seedRouteUrl) srvA := RunServer(optsA) defer srvA.Shutdown() - urlSeed := fmt.Sprintf("nats://%s:%d/", optsSeed.Host, optsSeed.Port) + urlSeed := fmt.Sprintf("nats://%s:%d/", optsSeed.Host, srvA.Addr().(*net.TCPAddr).Port) nc1, err := nats.Connect(urlSeed) if err != nil { @@ -313,12 +319,13 @@ func TestChainedSolicitWorks(t *testing.T) { optsB := nextServerOpts(optsA) // Server B connects to A - optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsA.Cluster.Host, optsA.Cluster.Port)) + optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsA.Cluster.Host, + srvA.ClusterAddr().(*net.TCPAddr).Port)) srvB := RunServer(optsB) defer srvB.Shutdown() - urlB := fmt.Sprintf("nats://%s:%d/", optsB.Host, optsB.Port) + urlB := fmt.Sprintf("nats://%s:%d/", optsB.Host, srvB.Addr().(*net.TCPAddr).Port) nc2, err := nats.Connect(urlB) if err != nil { @@ -346,13 +353,15 @@ func TestTLSChainedSolicitWorks(t *testing.T) { srvSeed := RunServer(optsSeed) defer srvSeed.Shutdown() + urlSeedRoute := fmt.Sprintf("nats://%s:%d", optsSeed.Cluster.Host, + srvSeed.ClusterAddr().(*net.TCPAddr).Port) optsA := nextServerOpts(optsSeed) - optsA.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsSeed.Cluster.Host, optsSeed.Cluster.Port)) + optsA.Routes = RoutesFromStr(urlSeedRoute) srvA := RunServer(optsA) defer srvA.Shutdown() - urlSeed := fmt.Sprintf("nats://%s:%d/", optsSeed.Host, optsSeed.Port) + urlSeed := fmt.Sprintf("nats://%s:%d/", optsSeed.Host, srvSeed.Addr().(*net.TCPAddr).Port) nc1, err := nats.Connect(urlSeed) if err != nil { @@ -367,12 +376,13 @@ func TestTLSChainedSolicitWorks(t *testing.T) { optsB := nextServerOpts(optsA) // Server B connects to A - optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsA.Cluster.Host, optsA.Cluster.Port)) + optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsA.Cluster.Host, + srvA.ClusterAddr().(*net.TCPAddr).Port)) srvB := RunServer(optsB) defer srvB.Shutdown() - urlB := fmt.Sprintf("nats://%s:%d/", optsB.Host, optsB.Port) + urlB := fmt.Sprintf("nats://%s:%d/", optsB.Host, srvB.Addr().(*net.TCPAddr).Port) nc2, err := nats.Connect(urlB) if err != nil { @@ -397,10 +407,10 @@ func TestRouteTLSHandshakeError(t *testing.T) { srvSeed := RunServer(optsSeed) defer srvSeed.Shutdown() - opts := DefaultOptions + opts := DefaultOptions() opts.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsSeed.Cluster.Host, optsSeed.Cluster.Port)) - srv := RunServer(&opts) + srv := RunServer(opts) defer srv.Shutdown() time.Sleep(500 * time.Millisecond) @@ -419,11 +429,11 @@ func TestRouteTLSHandshakeError(t *testing.T) { } func TestBlockedShutdownOnRouteAcceptLoopFailure(t *testing.T) { - opts := DefaultOptions + opts := DefaultOptions() opts.Cluster.Host = "x.x.x.x" opts.Cluster.Port = 7222 - s := New(&opts) + s := New(opts) go s.Start() // Wait a second time.Sleep(time.Second) @@ -443,7 +453,7 @@ func TestBlockedShutdownOnRouteAcceptLoopFailure(t *testing.T) { } func TestRouteUseIPv6(t *testing.T) { - opts := DefaultOptions + opts := DefaultOptions() opts.Cluster.Host = "::" opts.Cluster.Port = 6222 @@ -456,7 +466,7 @@ func TestRouteUseIPv6(t *testing.T) { t.Skipf("Skipping this test since there is no IPv6 support on this host: %v", err) } - s := RunServer(&opts) + s := RunServer(opts) defer s.Shutdown() routeUp := false @@ -484,16 +494,18 @@ func TestRouteUseIPv6(t *testing.T) { } func TestClientConnectToRoutePort(t *testing.T) { - opts := DefaultOptions + opts := DefaultOptions() + // Since client will first connect to the route listen port, set the // cluster's Host to localhost so it works on Windows too, since on // Windows, a client can't use 0.0.0.0 in a connect. opts.Cluster.Host = "localhost" opts.Cluster.NoAdvertise = true - s := RunServer(&opts) - defer s.Shutdown() + s := RunServer(opts) + s.Noticef("%+v\n", opts) + // defer s.Shutdown() - url := fmt.Sprintf("nats://%s:%d", opts.Cluster.Host, opts.Cluster.Port) + url := fmt.Sprintf("nats://%s:%d", opts.Cluster.Host, s.ClusterAddr().(*net.TCPAddr).Port) clientURL := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port) // When connecting to the ROUTE port, the client library will receive the // CLIENT port in the INFO protocol. This URL is added to the client's pool @@ -504,6 +516,7 @@ func TestClientConnectToRoutePort(t *testing.T) { // attempts rather small. total := 10 for i := 0; i < total; i++ { + s.Noticef("URL: %s", url) nc, err := nats.Connect(url) if err != nil { t.Fatalf("Unexepected error on connect: %v", err) diff --git a/server/server.go b/server/server.go index a57cf599..f855331c 100644 --- a/server/server.go +++ b/server/server.go @@ -68,6 +68,7 @@ type Server struct { start time.Time http net.Listener httpHandler http.Handler + profiler net.Listener httpReqStats map[string]uint64 routeListener net.Listener routeInfo Info @@ -78,6 +79,7 @@ type Server struct { grRunning bool grWG sync.WaitGroup // to wait on various go routines cproto int64 // number of clients supporting async INFO + fatalError string // Captures the error string for any fatal error logging struct { sync.RWMutex logger Logger @@ -329,6 +331,12 @@ func (s *Server) Shutdown() { s.http = nil } + // Kick Profiling if its running + if s.profiler != nil { + doneExpected++ + s.profiler.Close() + } + // Release the solicited routes connect go routines. close(s.rcQuit) @@ -363,12 +371,12 @@ func (s *Server) AcceptLoop(clr chan struct{}) { opts := s.getOpts() hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port)) - s.Noticef("Listening for client connections on %s", hp) l, e := net.Listen("tcp", hp) if e != nil { s.Fatalf("Error listening on port: %s, %q", hp, e) return } + s.Noticef("Listening for client connections on %s", l.Addr().String()) // Alert of TLS enabled. if opts.TLSConfig != nil { @@ -436,16 +444,43 @@ func (s *Server) AcceptLoop(clr chan struct{}) { // StartProfiler is called to enable dynamic profiling. func (s *Server) StartProfiler() { + s.Noticef("in StartProfiler %s", "HOW!?!?") // Snapshot server options. opts := s.getOpts() + s.Noticef(opts.HTTPHost) + + port := opts.ProfPort + + // Check for Random Port + if port == -1 { + port = 0 + } + + hp := net.JoinHostPort(opts.Host, strconv.Itoa(port)) + + l, err := net.Listen("tcp", hp) + s.Noticef("profiling port: %d", l.Addr().(*net.TCPAddr).Port) + + if err != nil { + s.Fatalf("error starting profiler: %s", err) + } + + srv := &http.Server{ + Addr: hp, + Handler: http.DefaultServeMux, + ReadTimeout: 2 * time.Second, + WriteTimeout: 2 * time.Second, + MaxHeaderBytes: 1 << 20, + } + + s.mu.Lock() + s.profiler = l + s.mu.Unlock() - s.Noticef("Starting profiling on http port %d", opts.ProfPort) - hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.ProfPort)) go func() { - err := http.ListenAndServe(hp, nil) - if err != nil { - s.Fatalf("error starting monitor server: %s", err) - } + // if this errors out, it's probably because the server is being shutdown + srv.Serve(l) + s.done <- true }() } @@ -889,6 +924,16 @@ func (s *Server) MonitorAddr() net.Addr { return s.http.Addr() } +// RouteAddr returns the net.Addr object for the route listener. +func (s *Server) ClusterAddr() net.Addr { + s.mu.Lock() + defer s.mu.Unlock() + if s.routeListener == nil { + return nil + } + return s.routeListener.Addr() +} + // ReadyForConnections returns `true` if the server is ready to accept client // and, if routing is enabled, route connections. If after the duration // `dur` the server is still not ready, returns `false`. @@ -977,3 +1022,8 @@ func (s *Server) getClientConnectURLs() []string { } return urls } + +// Return startup error, if there is one. Useful for unit testing. +func (s *Server) FatalError() string { + return s.fatalError +} diff --git a/server/server_test.go b/server/server_test.go index d88c676e..a7c77553 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -13,26 +13,35 @@ import ( "github.com/nats-io/go-nats" ) -var DefaultOptions = Options{ - Host: "localhost", - Port: 11222, - HTTPPort: 11333, - Cluster: ClusterOpts{Port: 11444}, - ProfPort: 11280, - NoLog: true, - NoSigs: true, +func DefaultOptions() *Options { + return &Options{ + Host: "localhost", + Port: -1, + HTTPPort: -1, + Cluster: ClusterOpts{Port: -1}, + ProfPort: -1, + NoLog: true, + NoSigs: true, + Debug: true, + Trace: true, + } } // New Go Routine based server func RunServer(opts *Options) *Server { if opts == nil { - opts = &DefaultOptions + opts = DefaultOptions() } s := New(opts) + if s == nil { panic("No NATS Server object returned.") } + if !opts.NoLog { + s.ConfigureLogger() + } + // Run server in Go routine. go s.Start() @@ -43,8 +52,19 @@ func RunServer(opts *Options) *Server { return s } +func TestStartProfiler(t *testing.T) { + s := New(DefaultOptions()) + s.StartProfiler() + s.mu.Lock() + s.profiler.Close() + s.mu.Unlock() +} + func TestStartupAndShutdown(t *testing.T) { - s := RunServer(&DefaultOptions) + + opts := DefaultOptions() + + s := RunServer(opts) defer s.Shutdown() if !s.isRunning() { @@ -125,13 +145,13 @@ func TestTlsCipher(t *testing.T) { } func TestGetConnectURLs(t *testing.T) { - opts := DefaultOptions + opts := DefaultOptions() opts.Port = 4222 var globalIP net.IP checkGlobalConnectURLs := func() { - s := New(&opts) + s := New(opts) defer s.Shutdown() urls := s.getClientConnectURLs() @@ -167,7 +187,7 @@ func TestGetConnectURLs(t *testing.T) { } checkConnectURLsHasOnlyOne := func() { - s := New(&opts) + s := New(opts) defer s.Shutdown() urls := s.getClientConnectURLs() @@ -195,24 +215,27 @@ func TestGetConnectURLs(t *testing.T) { } func TestNoDeadlockOnStartFailure(t *testing.T) { - opts := DefaultOptions + opts := DefaultOptions() opts.Host = "x.x.x.x" // bad host opts.Port = 4222 + opts.HTTPHost = opts.Host opts.Cluster.Host = "localhost" - opts.Cluster.Port = 6222 + opts.Cluster.Port = -1 + opts.ProfPort = -1 + s := New(opts) - s := New(&opts) // This should return since it should fail to start a listener // on x.x.x.x:4222 s.Start() + // We should be able to shutdown s.Shutdown() } func TestMaxConnections(t *testing.T) { - opts := DefaultOptions + opts := DefaultOptions() opts.MaxConn = 1 - s := RunServer(&opts) + s := RunServer(opts) defer s.Shutdown() addr := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port) @@ -277,10 +300,9 @@ func TestProcessCommandLineArgs(t *testing.T) { } func TestWriteDeadline(t *testing.T) { - opts := DefaultOptions + opts := DefaultOptions() opts.WriteDeadline = 20 * time.Millisecond - opts.NoLog = false - s := RunServer(&opts) + s := RunServer(opts) defer s.Shutdown() c, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", opts.Host, opts.Port), 3*time.Second) @@ -331,10 +353,10 @@ func TestWriteDeadline(t *testing.T) { } func TestRandomPorts(t *testing.T) { - opts := DefaultOptions + opts := DefaultOptions() opts.HTTPPort = -1 opts.Port = -1 - s := RunServer(&opts) + s := RunServer(opts) defer s.Shutdown() @@ -353,10 +375,10 @@ func TestRandomPorts(t *testing.T) { } func TestNilMonitoringPort(t *testing.T) { - opts := DefaultOptions + opts := DefaultOptions() opts.HTTPPort = 0 opts.HTTPSPort = 0 - s := RunServer(&opts) + s := RunServer(opts) defer s.Shutdown()