mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
changes to support random ports for clusters and profiler.
This commit is contained in:
@@ -140,7 +140,7 @@ func (p *parser) processItem(it item) error {
|
||||
case itemInteger:
|
||||
lastDigit := 0
|
||||
for _, r := range it.val {
|
||||
if !unicode.IsDigit(r) {
|
||||
if !unicode.IsDigit(r) && r != '-' {
|
||||
break
|
||||
}
|
||||
lastDigit++
|
||||
|
||||
@@ -634,20 +634,23 @@ func TestTwoTokenPubMatchSingleTokenSub(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestUnsubRace(t *testing.T) {
|
||||
s := RunServer(nil)
|
||||
opts := DefaultOptions()
|
||||
s := RunServer(opts)
|
||||
defer s.Shutdown()
|
||||
|
||||
nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d",
|
||||
DefaultOptions.Host,
|
||||
DefaultOptions.Port))
|
||||
url := fmt.Sprintf("nats://%s:%d",
|
||||
s.getOpts().Host,
|
||||
s.Addr().(*net.TCPAddr).Port,
|
||||
)
|
||||
nc, err := nats.Connect(url)
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating client: %v\n", err)
|
||||
t.Fatalf("Error creating client to %s: %v\n", url, err)
|
||||
}
|
||||
defer nc.Close()
|
||||
|
||||
ncp, err := nats.Connect(fmt.Sprintf("nats://%s:%d",
|
||||
DefaultOptions.Host,
|
||||
DefaultOptions.Port))
|
||||
s.getOpts().Host,
|
||||
s.Addr().(*net.TCPAddr).Port))
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating client: %v\n", err)
|
||||
}
|
||||
|
||||
1
server/configs/reload/basic.conf
Normal file
1
server/configs/reload/basic.conf
Normal file
@@ -0,0 +1 @@
|
||||
listen: localhost:-1
|
||||
10
server/configs/reload/tls_test.conf
Normal file
10
server/configs/reload/tls_test.conf
Normal file
@@ -0,0 +1,10 @@
|
||||
|
||||
# Simple TLS config file
|
||||
|
||||
listen: localhost:-1
|
||||
|
||||
tls {
|
||||
cert_file: "./configs/certs/server.pem"
|
||||
key_file: "./configs/certs/key.pem"
|
||||
timeout: 2
|
||||
}
|
||||
12
server/configs/reload/tls_verify_test.conf
Normal file
12
server/configs/reload/tls_verify_test.conf
Normal file
@@ -0,0 +1,12 @@
|
||||
|
||||
# Simple TLS config file
|
||||
|
||||
listen: localhost:-1
|
||||
|
||||
tls {
|
||||
cert_file: "./configs/certs/cert.new.pem"
|
||||
key_file: "./configs/certs/key.new.pem"
|
||||
ca_file: "./configs/certs/cert.new.pem"
|
||||
verify: true
|
||||
timeout: 2
|
||||
}
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/nats-io/gnatsd/logger"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// Logger interface of the NATS Server
|
||||
@@ -118,6 +119,11 @@ func (s *Server) Errorf(format string, v ...interface{}) {
|
||||
|
||||
// Fatalf logs a fatal error
|
||||
func (s *Server) Fatalf(format string, v ...interface{}) {
|
||||
s.mu.Lock()
|
||||
// always store the fatal log
|
||||
errStr := fmt.Sprintf(format, v)
|
||||
s.fatalError = errStr
|
||||
s.mu.Unlock()
|
||||
s.executeLogCall(func(logger Logger, format string, v ...interface{}) {
|
||||
logger.Fatalf(format, v...)
|
||||
}, format, v...)
|
||||
|
||||
@@ -15,36 +15,37 @@ import (
|
||||
"unicode"
|
||||
|
||||
"github.com/nats-io/go-nats"
|
||||
"net"
|
||||
)
|
||||
|
||||
const CLIENT_PORT = 11224
|
||||
const MONITOR_PORT = 11424
|
||||
const CLUSTER_PORT = 12444
|
||||
const CLIENT_PORT = -1
|
||||
const MONITOR_PORT = -1
|
||||
const CLUSTER_PORT = -1
|
||||
|
||||
var DefaultMonitorOptions = Options{
|
||||
func DefaultMonitorOptions() *Options {
|
||||
return &Options {
|
||||
Host: "localhost",
|
||||
Port: CLIENT_PORT,
|
||||
HTTPHost: "127.0.0.1",
|
||||
HTTPPort: MONITOR_PORT,
|
||||
Cluster: ClusterOpts{
|
||||
Host: "localhost",
|
||||
Port: CLUSTER_PORT,
|
||||
},
|
||||
NoLog: true,
|
||||
NoSigs: true,
|
||||
Debug: true,
|
||||
Trace: true,
|
||||
}
|
||||
}
|
||||
|
||||
func runMonitorServer() *Server {
|
||||
resetPreviousHTTPConnections()
|
||||
opts := DefaultMonitorOptions
|
||||
return RunServer(&opts)
|
||||
opts := DefaultMonitorOptions()
|
||||
return RunServer(opts)
|
||||
}
|
||||
|
||||
func runMonitorServerNoHTTPPort() *Server {
|
||||
resetPreviousHTTPConnections()
|
||||
opts := DefaultMonitorOptions
|
||||
opts := DefaultMonitorOptions()
|
||||
opts.HTTPPort = 0
|
||||
return RunServer(&opts)
|
||||
return RunServer(opts)
|
||||
}
|
||||
|
||||
func resetPreviousHTTPConnections() {
|
||||
@@ -88,7 +89,8 @@ func TestNoMonitorPort(t *testing.T) {
|
||||
s := runMonitorServerNoHTTPPort()
|
||||
defer s.Shutdown()
|
||||
|
||||
url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT)
|
||||
// this test might be meaningless now that we're testing with random ports?
|
||||
url := fmt.Sprintf("http://localhost:%d/", 11245)
|
||||
if resp, err := http.Get(url + "varz"); err == nil {
|
||||
t.Fatalf("Expected error: Got %+v\n", resp)
|
||||
}
|
||||
@@ -104,7 +106,7 @@ func TestVarz(t *testing.T) {
|
||||
s := runMonitorServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT)
|
||||
url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port)
|
||||
resp, err := http.Get(url + "varz")
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error: Got %v\n", err)
|
||||
@@ -132,7 +134,7 @@ func TestVarz(t *testing.T) {
|
||||
t.Fatal("Expected start time to be within 10 seconds.")
|
||||
}
|
||||
|
||||
nc := createClientConnSubscribeAndPublish(t)
|
||||
nc := createClientConnSubscribeAndPublish(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
resp, err = http.Get(url + "varz")
|
||||
@@ -176,7 +178,7 @@ func TestVarz(t *testing.T) {
|
||||
}
|
||||
|
||||
// Test JSONP
|
||||
respj, errj := http.Get(fmt.Sprintf("http://localhost:%d/", MONITOR_PORT) + "varz?callback=callback")
|
||||
respj, errj := http.Get(fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port) + "varz?callback=callback")
|
||||
if errj != nil {
|
||||
t.Fatalf("Expected no error: Got %v\n", err)
|
||||
}
|
||||
@@ -191,7 +193,7 @@ func TestConnz(t *testing.T) {
|
||||
s := runMonitorServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT)
|
||||
url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port)
|
||||
resp, err := http.Get(url + "connz")
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error: Got %v\n", err)
|
||||
@@ -226,7 +228,7 @@ func TestConnz(t *testing.T) {
|
||||
}
|
||||
|
||||
// Test with connections.
|
||||
nc := createClientConnSubscribeAndPublish(t)
|
||||
nc := createClientConnSubscribeAndPublish(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
resp, err = http.Get(url + "connz")
|
||||
@@ -311,7 +313,7 @@ func TestConnz(t *testing.T) {
|
||||
}
|
||||
|
||||
// Test JSONP
|
||||
respj, errj := http.Get(fmt.Sprintf("http://localhost:%d/", MONITOR_PORT) + "connz?callback=callback")
|
||||
respj, errj := http.Get(fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port) + "connz?callback=callback")
|
||||
if errj != nil {
|
||||
t.Fatalf("Expected no error: Got %v\n", err)
|
||||
}
|
||||
@@ -326,10 +328,10 @@ func TestConnzWithSubs(t *testing.T) {
|
||||
s := runMonitorServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
nc := createClientConnSubscribeAndPublish(t)
|
||||
nc := createClientConnSubscribeAndPublish(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT)
|
||||
url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port)
|
||||
resp, err := http.Get(url + "connz?subs=1")
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error: Got %v\n", err)
|
||||
@@ -359,16 +361,16 @@ func TestConnzLastActivity(t *testing.T) {
|
||||
s := runMonitorServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
nc := createClientConnSubscribeAndPublish(t)
|
||||
nc := createClientConnSubscribeAndPublish(t, s)
|
||||
defer nc.Close()
|
||||
nc.Flush()
|
||||
|
||||
nc2 := createClientConnSubscribeAndPublish(t)
|
||||
nc2 := createClientConnSubscribeAndPublish(t, s)
|
||||
defer nc2.Close()
|
||||
nc2.Flush()
|
||||
|
||||
pollConz := func() *Connz {
|
||||
url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT)
|
||||
url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port)
|
||||
resp, err := http.Get(url + "connz?subs=1")
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error: Got %v\n", err)
|
||||
@@ -456,7 +458,7 @@ func TestConnzWithOffsetAndLimit(t *testing.T) {
|
||||
s := runMonitorServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT)
|
||||
url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port)
|
||||
|
||||
// Test that offset and limit ok when not enough connections
|
||||
resp, err := http.Get(url + "connz?offset=1&limit=1")
|
||||
@@ -505,10 +507,10 @@ func TestConnzWithOffsetAndLimit(t *testing.T) {
|
||||
DefaultConnListSize, c.Offset, c.Limit)
|
||||
}
|
||||
|
||||
cl1 := createClientConnSubscribeAndPublish(t)
|
||||
cl1 := createClientConnSubscribeAndPublish(t, s)
|
||||
defer cl1.Close()
|
||||
|
||||
cl2 := createClientConnSubscribeAndPublish(t)
|
||||
cl2 := createClientConnSubscribeAndPublish(t, s)
|
||||
defer cl2.Close()
|
||||
|
||||
resp, err = http.Get(url + "connz?offset=1&limit=1")
|
||||
@@ -590,15 +592,16 @@ func TestConnzWithOffsetAndLimit(t *testing.T) {
|
||||
|
||||
func TestConnzDefaultSorted(t *testing.T) {
|
||||
s := runMonitorServer()
|
||||
s.ConfigureLogger()
|
||||
defer s.Shutdown()
|
||||
|
||||
clients := make([]*nats.Conn, 4)
|
||||
for i := range clients {
|
||||
clients[i] = createClientConnSubscribeAndPublish(t)
|
||||
clients[i] = createClientConnSubscribeAndPublish(t, s)
|
||||
defer clients[i].Close()
|
||||
}
|
||||
|
||||
url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT)
|
||||
url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port)
|
||||
resp, err := http.Get(url + "connz")
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error: Got %v\n", err)
|
||||
@@ -630,11 +633,11 @@ func TestConnzSortedByCid(t *testing.T) {
|
||||
|
||||
clients := make([]*nats.Conn, 4)
|
||||
for i := range clients {
|
||||
clients[i] = createClientConnSubscribeAndPublish(t)
|
||||
clients[i] = createClientConnSubscribeAndPublish(t, s)
|
||||
defer clients[i].Close()
|
||||
}
|
||||
|
||||
url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT)
|
||||
url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port)
|
||||
resp, err := http.Get(url + "connz?sort=cid")
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error: Got %v\n", err)
|
||||
@@ -665,7 +668,7 @@ func TestConnzSortedByBytesAndMsgs(t *testing.T) {
|
||||
defer s.Shutdown()
|
||||
|
||||
// Create a connection and make it send more messages than others
|
||||
firstClient := createClientConnSubscribeAndPublish(t)
|
||||
firstClient := createClientConnSubscribeAndPublish(t, s)
|
||||
for i := 0; i < 100; i++ {
|
||||
firstClient.Publish("foo", []byte("Hello World"))
|
||||
}
|
||||
@@ -674,11 +677,11 @@ func TestConnzSortedByBytesAndMsgs(t *testing.T) {
|
||||
|
||||
clients := make([]*nats.Conn, 3)
|
||||
for i := range clients {
|
||||
clients[i] = createClientConnSubscribeAndPublish(t)
|
||||
clients[i] = createClientConnSubscribeAndPublish(t, s)
|
||||
defer clients[i].Close()
|
||||
}
|
||||
|
||||
url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT)
|
||||
url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port)
|
||||
resp, err := http.Get(url + "connz?sort=bytes_to")
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error: Got %v\n", err)
|
||||
@@ -704,7 +707,7 @@ func TestConnzSortedByBytesAndMsgs(t *testing.T) {
|
||||
c.Conns[0].OutBytes, c.Conns[1].OutBytes, c.Conns[2].OutBytes, c.Conns[3].OutBytes)
|
||||
}
|
||||
|
||||
url = fmt.Sprintf("http://localhost:%d/", MONITOR_PORT)
|
||||
url = fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port)
|
||||
resp, err = http.Get(url + "connz?sort=msgs_to")
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error: Got %v\n", err)
|
||||
@@ -730,7 +733,7 @@ func TestConnzSortedByBytesAndMsgs(t *testing.T) {
|
||||
c.Conns[0].OutMsgs, c.Conns[1].OutMsgs, c.Conns[2].OutMsgs, c.Conns[3].OutMsgs)
|
||||
}
|
||||
|
||||
url = fmt.Sprintf("http://localhost:%d/", MONITOR_PORT)
|
||||
url = fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port)
|
||||
resp, err = http.Get(url + "connz?sort=bytes_from")
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error: Got %v\n", err)
|
||||
@@ -756,7 +759,7 @@ func TestConnzSortedByBytesAndMsgs(t *testing.T) {
|
||||
c.Conns[0].InBytes, c.Conns[1].InBytes, c.Conns[2].InBytes, c.Conns[3].InBytes)
|
||||
}
|
||||
|
||||
url = fmt.Sprintf("http://localhost:%d/", MONITOR_PORT)
|
||||
url = fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port)
|
||||
resp, err = http.Get(url + "connz?sort=msgs_from")
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error: Got %v\n", err)
|
||||
@@ -787,16 +790,16 @@ func TestConnzSortedByPending(t *testing.T) {
|
||||
s := runMonitorServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
firstClient := createClientConnSubscribeAndPublish(t)
|
||||
firstClient := createClientConnSubscribeAndPublish(t, s)
|
||||
firstClient.Subscribe("hello.world", func(m *nats.Msg) {})
|
||||
clients := make([]*nats.Conn, 3)
|
||||
for i := range clients {
|
||||
clients[i] = createClientConnSubscribeAndPublish(t)
|
||||
clients[i] = createClientConnSubscribeAndPublish(t, s)
|
||||
defer clients[i].Close()
|
||||
}
|
||||
defer firstClient.Close()
|
||||
|
||||
url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT)
|
||||
url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port)
|
||||
resp, err := http.Get(url + "connz?sort=pending")
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error: Got %v\n", err)
|
||||
@@ -827,16 +830,16 @@ func TestConnzSortedBySubs(t *testing.T) {
|
||||
s := runMonitorServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
firstClient := createClientConnSubscribeAndPublish(t)
|
||||
firstClient := createClientConnSubscribeAndPublish(t, s)
|
||||
firstClient.Subscribe("hello.world", func(m *nats.Msg) {})
|
||||
clients := make([]*nats.Conn, 3)
|
||||
for i := range clients {
|
||||
clients[i] = createClientConnSubscribeAndPublish(t)
|
||||
clients[i] = createClientConnSubscribeAndPublish(t, s)
|
||||
defer clients[i].Close()
|
||||
}
|
||||
defer firstClient.Close()
|
||||
|
||||
url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT)
|
||||
url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port)
|
||||
resp, err := http.Get(url + "connz?sort=subs")
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error: Got %v\n", err)
|
||||
@@ -864,22 +867,23 @@ func TestConnzSortedBySubs(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestConnzSortedByLast(t *testing.T) {
|
||||
s := runMonitorServer()
|
||||
opts := DefaultMonitorOptions()
|
||||
s := RunServer(opts)
|
||||
defer s.Shutdown()
|
||||
|
||||
firstClient := createClientConnSubscribeAndPublish(t)
|
||||
firstClient := createClientConnSubscribeAndPublish(t, s)
|
||||
defer firstClient.Close()
|
||||
firstClient.Subscribe("hello.world", func(m *nats.Msg) {})
|
||||
firstClient.Flush()
|
||||
|
||||
clients := make([]*nats.Conn, 3)
|
||||
for i := range clients {
|
||||
clients[i] = createClientConnSubscribeAndPublish(t)
|
||||
clients[i] = createClientConnSubscribeAndPublish(t, s)
|
||||
defer clients[i].Close()
|
||||
clients[i].Flush()
|
||||
}
|
||||
|
||||
url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT)
|
||||
url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port)
|
||||
resp, err := http.Get(url + "connz?sort=last")
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error: Got %v\n", err)
|
||||
@@ -912,12 +916,12 @@ func TestConnzSortedByUptime(t *testing.T) {
|
||||
|
||||
clients := make([]*nats.Conn, 5)
|
||||
for i := range clients {
|
||||
clients[i] = createClientConnSubscribeAndPublish(t)
|
||||
clients[i] = createClientConnSubscribeAndPublish(t, s)
|
||||
defer clients[i].Close()
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
}
|
||||
|
||||
url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT)
|
||||
url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port)
|
||||
resp, err := http.Get(url + "connz?sort=uptime")
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error: Got %v\n", err)
|
||||
@@ -949,12 +953,12 @@ func TestConnzSortedByIdle(t *testing.T) {
|
||||
s := runMonitorServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
firstClient := createClientConnSubscribeAndPublish(t)
|
||||
firstClient := createClientConnSubscribeAndPublish(t, s)
|
||||
defer firstClient.Close()
|
||||
firstClient.Subscribe("client.1", func(m *nats.Msg) {})
|
||||
firstClient.Flush()
|
||||
|
||||
secondClient := createClientConnSubscribeAndPublish(t)
|
||||
secondClient := createClientConnSubscribeAndPublish(t, s)
|
||||
defer secondClient.Close()
|
||||
secondClient.Subscribe("client.2", func(m *nats.Msg) {})
|
||||
secondClient.Flush()
|
||||
@@ -963,7 +967,7 @@ func TestConnzSortedByIdle(t *testing.T) {
|
||||
time.Sleep(time.Second)
|
||||
firstClient.Publish("client.1", []byte("new message"))
|
||||
|
||||
url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT)
|
||||
url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port)
|
||||
resp, err := http.Get(url + "connz?sort=idle")
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error: Got %v\n", err)
|
||||
@@ -1011,16 +1015,16 @@ func TestConnzSortBadRequest(t *testing.T) {
|
||||
s := runMonitorServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
firstClient := createClientConnSubscribeAndPublish(t)
|
||||
firstClient := createClientConnSubscribeAndPublish(t, s)
|
||||
firstClient.Subscribe("hello.world", func(m *nats.Msg) {})
|
||||
clients := make([]*nats.Conn, 3)
|
||||
for i := range clients {
|
||||
clients[i] = createClientConnSubscribeAndPublish(t)
|
||||
clients[i] = createClientConnSubscribeAndPublish(t, s)
|
||||
defer clients[i].Close()
|
||||
}
|
||||
defer firstClient.Close()
|
||||
|
||||
url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT)
|
||||
url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port)
|
||||
resp, err := http.Get(url + "connz?sort=foo")
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error: Got %v\n", err)
|
||||
@@ -1032,28 +1036,34 @@ func TestConnzSortBadRequest(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestConnzWithRoutes(t *testing.T) {
|
||||
s := runMonitorServer()
|
||||
|
||||
opts := DefaultMonitorOptions()
|
||||
opts.Cluster.Host = "localhost"
|
||||
opts.Cluster.Port = CLUSTER_PORT
|
||||
|
||||
s := RunServer(opts)
|
||||
|
||||
defer s.Shutdown()
|
||||
|
||||
var opts = Options{
|
||||
opts = &Options{
|
||||
Host: "localhost",
|
||||
Port: CLIENT_PORT + 1,
|
||||
Port: -1,
|
||||
Cluster: ClusterOpts{
|
||||
Host: "localhost",
|
||||
Port: CLUSTER_PORT + 1,
|
||||
Port: -1,
|
||||
},
|
||||
NoLog: true,
|
||||
NoSigs: true,
|
||||
}
|
||||
routeURL, _ := url.Parse(fmt.Sprintf("nats-route://127.0.0.1:%d", CLUSTER_PORT))
|
||||
routeURL, _ := url.Parse(fmt.Sprintf("nats-route://127.0.0.1:%d", s.ClusterAddr().(*net.TCPAddr).Port))
|
||||
opts.Routes = []*url.URL{routeURL}
|
||||
|
||||
sc := RunServer(&opts)
|
||||
sc := RunServer(opts)
|
||||
defer sc.Shutdown()
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT)
|
||||
url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port)
|
||||
resp, err := http.Get(url + "connz")
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error: Got %v\n", err)
|
||||
@@ -1086,7 +1096,7 @@ func TestConnzWithRoutes(t *testing.T) {
|
||||
}
|
||||
|
||||
// Now check routez
|
||||
url = fmt.Sprintf("http://localhost:%d/", MONITOR_PORT)
|
||||
url = fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port)
|
||||
resp, err = http.Get(url + "routez")
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error: Got %v\n", err)
|
||||
@@ -1120,7 +1130,7 @@ func TestConnzWithRoutes(t *testing.T) {
|
||||
}
|
||||
|
||||
// Test JSONP
|
||||
respj, errj := http.Get(fmt.Sprintf("http://localhost:%d/", MONITOR_PORT) + "routez?callback=callback")
|
||||
respj, errj := http.Get(fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port) + "routez?callback=callback")
|
||||
if errj != nil {
|
||||
t.Fatalf("Expected no error: Got %v\n", err)
|
||||
}
|
||||
@@ -1135,10 +1145,10 @@ func TestSubsz(t *testing.T) {
|
||||
s := runMonitorServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
nc := createClientConnSubscribeAndPublish(t)
|
||||
nc := createClientConnSubscribeAndPublish(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT)
|
||||
url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port)
|
||||
resp, err := http.Get(url + "subscriptionsz")
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error: Got %v\n", err)
|
||||
@@ -1171,7 +1181,7 @@ func TestSubsz(t *testing.T) {
|
||||
}
|
||||
|
||||
// Test JSONP
|
||||
respj, errj := http.Get(fmt.Sprintf("http://localhost:%d/", MONITOR_PORT) + "subscriptionsz?callback=callback")
|
||||
respj, errj := http.Get(fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port) + "subscriptionsz?callback=callback")
|
||||
ct = respj.Header.Get("Content-Type")
|
||||
if errj != nil {
|
||||
t.Fatalf("Expected no error: Got %v\n", err)
|
||||
@@ -1187,10 +1197,10 @@ func TestHandleRoot(t *testing.T) {
|
||||
s := runMonitorServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
nc := createClientConnSubscribeAndPublish(t)
|
||||
nc := createClientConnSubscribeAndPublish(t, s)
|
||||
defer nc.Close()
|
||||
|
||||
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/", MONITOR_PORT))
|
||||
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port))
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error: Got %v\n", err)
|
||||
}
|
||||
@@ -1220,10 +1230,10 @@ func TestConnzWithNamedClient(t *testing.T) {
|
||||
defer s.Shutdown()
|
||||
|
||||
clientName := "test-client"
|
||||
nc := createClientConnWithName(t, clientName)
|
||||
nc := createClientConnWithName(t, clientName, s)
|
||||
defer nc.Close()
|
||||
|
||||
url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT)
|
||||
url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port)
|
||||
resp, err := http.Get(url + "connz")
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error: Got %v\n", err)
|
||||
@@ -1260,10 +1270,13 @@ func TestConnzWithNamedClient(t *testing.T) {
|
||||
}
|
||||
|
||||
// Create a connection to test ConnInfo
|
||||
func createClientConnSubscribeAndPublish(t *testing.T) *nats.Conn {
|
||||
nc, err := nats.Connect(fmt.Sprintf("nats://localhost:%d", CLIENT_PORT))
|
||||
func createClientConnSubscribeAndPublish(t *testing.T, s *Server) *nats.Conn {
|
||||
natsUrl := fmt.Sprintf("nats://127.0.0.1:%d", s.Addr().(*net.TCPAddr).Port)
|
||||
client := nats.DefaultOptions
|
||||
client.Servers = []string{natsUrl}
|
||||
nc, err := client.Connect()
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating client: %v\n", err)
|
||||
t.Fatalf("Error creating client: %v to: %s\n", err, natsUrl)
|
||||
}
|
||||
|
||||
ch := make(chan bool)
|
||||
@@ -1274,8 +1287,8 @@ func createClientConnSubscribeAndPublish(t *testing.T) *nats.Conn {
|
||||
return nc
|
||||
}
|
||||
|
||||
func createClientConnWithName(t *testing.T, name string) *nats.Conn {
|
||||
natsURI := fmt.Sprintf("nats://localhost:%d", CLIENT_PORT)
|
||||
func createClientConnWithName(t *testing.T, name string, s *Server) *nats.Conn {
|
||||
natsURI := fmt.Sprintf("nats://localhost:%d", s.Addr().(*net.TCPAddr).Port)
|
||||
|
||||
client := nats.DefaultOptions
|
||||
client.Servers = []string{natsURI}
|
||||
@@ -1292,7 +1305,7 @@ func TestStacksz(t *testing.T) {
|
||||
s := runMonitorServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT)
|
||||
url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port)
|
||||
resp, err := http.Get(url + "stacksz")
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error: Got %v\n", err)
|
||||
@@ -1315,7 +1328,7 @@ func TestStacksz(t *testing.T) {
|
||||
t.Fatalf("Result does not seem to contain server's stacks:\n%v", str)
|
||||
}
|
||||
// Test JSONP
|
||||
respj, errj := http.Get(fmt.Sprintf("http://localhost:%d/", MONITOR_PORT) + "subscriptionsz?callback=callback")
|
||||
respj, errj := http.Get(fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port) + "subscriptionsz?callback=callback")
|
||||
ct = respj.Header.Get("Content-Type")
|
||||
if errj != nil {
|
||||
t.Fatalf("Expected no error: Got %v\n", err)
|
||||
@@ -1330,7 +1343,7 @@ func TestConcurrentMonitoring(t *testing.T) {
|
||||
s := runMonitorServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
url := fmt.Sprintf("http://localhost:%d/", MONITOR_PORT)
|
||||
url := fmt.Sprintf("http://localhost:%d/", s.MonitorAddr().(*net.TCPAddr).Port)
|
||||
// Get some endpoints. Make sure we have at least varz,
|
||||
// and the more the merrier.
|
||||
endpoints := []string{"varz", "varz", "varz", "connz", "connz", "subsz", "subsz", "routez", "routez"}
|
||||
|
||||
@@ -123,9 +123,23 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) {
|
||||
case "tlstimeout":
|
||||
// TLSTimeout change is picked up when Options is swapped.
|
||||
continue
|
||||
case "port":
|
||||
// check to see if newValue == 0 and continue if so.
|
||||
if newValue == 0 {
|
||||
// ignore RANDOM_PORT
|
||||
continue
|
||||
} else {
|
||||
return nil, fmt.Errorf("Config reload not supported for %s", field.Name)
|
||||
}
|
||||
case "http_port", "https_port":
|
||||
// check to see if newValue == -1 (RANDOM_PORT for http/https monitoring port)
|
||||
if newValue == -1 {
|
||||
continue
|
||||
}
|
||||
fallthrough
|
||||
default:
|
||||
// Bail out if attempting to reload any unsupported options.
|
||||
return nil, fmt.Errorf("Config reload not supported for %s", field.Name)
|
||||
return nil, fmt.Errorf("Config reload not supported for %s: old=%v, new=%v", field.Name, oldValue, newValue)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/go-nats"
|
||||
"net"
|
||||
)
|
||||
|
||||
// Ensure Reload returns an error when attempting to reload a server that did
|
||||
@@ -225,7 +226,7 @@ func TestConfigReloadRotateTLS(t *testing.T) {
|
||||
}
|
||||
config := filepath.Join(dir, "tmp.conf")
|
||||
|
||||
if err := os.Symlink("./configs/tls_test.conf", config); err != nil {
|
||||
if err := os.Symlink("./configs/reload/tls_test.conf", config); err != nil {
|
||||
t.Fatalf("Error creating symlink: %v", err)
|
||||
}
|
||||
defer os.Remove(config)
|
||||
@@ -239,7 +240,7 @@ func TestConfigReloadRotateTLS(t *testing.T) {
|
||||
defer server.Shutdown()
|
||||
|
||||
// Ensure we can connect as a sanity check.
|
||||
addr := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
||||
addr := fmt.Sprintf("nats://%s:%d", opts.Host, server.Addr().(*net.TCPAddr).Port)
|
||||
nc, err := nats.Connect(addr, nats.Secure())
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating client: %v", err)
|
||||
@@ -255,7 +256,7 @@ func TestConfigReloadRotateTLS(t *testing.T) {
|
||||
if err := os.Remove(config); err != nil {
|
||||
t.Fatalf("Error deleting symlink: %v", err)
|
||||
}
|
||||
if err := os.Symlink("./configs/tls_verify_test.conf", config); err != nil {
|
||||
if err := os.Symlink("./configs/reload/tls_verify_test.conf", config); err != nil {
|
||||
t.Fatalf("Error creating symlink: %v", err)
|
||||
}
|
||||
if err := server.Reload(); err != nil {
|
||||
@@ -298,7 +299,7 @@ func TestConfigReloadEnableTLS(t *testing.T) {
|
||||
}
|
||||
config := filepath.Join(dir, "tmp.conf")
|
||||
|
||||
if err := os.Symlink("./configs/basic.conf", config); err != nil {
|
||||
if err := os.Symlink("./configs/reload/basic.conf", config); err != nil {
|
||||
t.Fatalf("Error creating symlink: %v", err)
|
||||
}
|
||||
defer os.Remove(config)
|
||||
@@ -312,7 +313,7 @@ func TestConfigReloadEnableTLS(t *testing.T) {
|
||||
defer server.Shutdown()
|
||||
|
||||
// Ensure we can connect as a sanity check.
|
||||
addr := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
||||
addr := fmt.Sprintf("nats://%s:%d", opts.Host, server.Addr().(*net.TCPAddr).Port)
|
||||
nc, err := nats.Connect(addr)
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating client: %v", err)
|
||||
@@ -323,7 +324,7 @@ func TestConfigReloadEnableTLS(t *testing.T) {
|
||||
if err := os.Remove(config); err != nil {
|
||||
t.Fatalf("Error deleting symlink: %v", err)
|
||||
}
|
||||
if err := os.Symlink("./configs/tls_test.conf", config); err != nil {
|
||||
if err := os.Symlink("./configs/reload/tls_test.conf", config); err != nil {
|
||||
t.Fatalf("Error creating symlink: %v", err)
|
||||
}
|
||||
if err := server.Reload(); err != nil {
|
||||
@@ -354,7 +355,7 @@ func TestConfigReloadDisableTLS(t *testing.T) {
|
||||
}
|
||||
config := filepath.Join(dir, "tmp.conf")
|
||||
|
||||
if err := os.Symlink("./configs/tls_test.conf", config); err != nil {
|
||||
if err := os.Symlink("./configs/reload/tls_test.conf", config); err != nil {
|
||||
t.Fatalf("Error creating symlink: %v", err)
|
||||
}
|
||||
defer os.Remove(config)
|
||||
@@ -368,7 +369,7 @@ func TestConfigReloadDisableTLS(t *testing.T) {
|
||||
defer server.Shutdown()
|
||||
|
||||
// Ensure we can connect as a sanity check.
|
||||
addr := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
||||
addr := fmt.Sprintf("nats://%s:%d", opts.Host, server.Addr().(*net.TCPAddr).Port)
|
||||
nc, err := nats.Connect(addr, nats.Secure())
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating client: %v", err)
|
||||
@@ -379,7 +380,7 @@ func TestConfigReloadDisableTLS(t *testing.T) {
|
||||
if err := os.Remove(config); err != nil {
|
||||
t.Fatalf("Error deleting symlink: %v", err)
|
||||
}
|
||||
if err := os.Symlink("./configs/basic.conf", config); err != nil {
|
||||
if err := os.Symlink("./configs/reload/basic.conf", config); err != nil {
|
||||
t.Fatalf("Error creating symlink: %v", err)
|
||||
}
|
||||
if err := server.Reload(); err != nil {
|
||||
|
||||
@@ -602,7 +602,19 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) {
|
||||
// Snapshot server options.
|
||||
opts := s.getOpts()
|
||||
|
||||
hp := net.JoinHostPort(opts.Cluster.Host, strconv.Itoa(opts.Cluster.Port))
|
||||
// Get all possible URLs (when server listens to 0.0.0.0).
|
||||
// This is going to be sent to other Servers, so that they can let their
|
||||
// clients know about us.
|
||||
clientConnectURLs := s.getClientConnectURLs()
|
||||
|
||||
// Snapshot server options.
|
||||
port := opts.Cluster.Port
|
||||
|
||||
if port == -1 {
|
||||
port = 0
|
||||
}
|
||||
|
||||
hp := net.JoinHostPort(opts.Cluster.Host, strconv.Itoa(port))
|
||||
s.Noticef("Listening for route connections on %s", hp)
|
||||
l, e := net.Listen("tcp", hp)
|
||||
if e != nil {
|
||||
@@ -612,6 +624,28 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) {
|
||||
return
|
||||
}
|
||||
|
||||
// Check for TLSConfig
|
||||
tlsReq := opts.Cluster.TLSConfig != nil
|
||||
info := Info{
|
||||
ID: s.info.ID,
|
||||
Version: s.info.Version,
|
||||
Host: opts.Cluster.Host,
|
||||
Port: l.Addr().(*net.TCPAddr).Port,
|
||||
AuthRequired: false,
|
||||
TLSRequired: tlsReq,
|
||||
SSLRequired: tlsReq,
|
||||
TLSVerify: tlsReq,
|
||||
MaxPayload: s.info.MaxPayload,
|
||||
ClientConnectURLs: clientConnectURLs,
|
||||
}
|
||||
// Check for Auth items
|
||||
if opts.Cluster.Username != "" {
|
||||
info.AuthRequired = true
|
||||
}
|
||||
s.routeInfo = info
|
||||
b, _ := json.Marshal(info)
|
||||
s.routeInfoJSON = []byte(fmt.Sprintf(InfoProto, b))
|
||||
|
||||
// Setup state that can enable shutdown
|
||||
s.mu.Lock()
|
||||
s.routeListener = l
|
||||
@@ -657,36 +691,6 @@ func (s *Server) StartRouting(clientListenReady chan struct{}) {
|
||||
// the possible ephemeral port to be selected.
|
||||
<-clientListenReady
|
||||
|
||||
// Get all possible URLs (when server listens to 0.0.0.0).
|
||||
// This is going to be sent to other Servers, so that they can let their
|
||||
// clients know about us.
|
||||
clientConnectURLs := s.getClientConnectURLs()
|
||||
|
||||
// Snapshot server options.
|
||||
opts := s.getOpts()
|
||||
|
||||
// Check for TLSConfig
|
||||
tlsReq := opts.Cluster.TLSConfig != nil
|
||||
info := Info{
|
||||
ID: s.info.ID,
|
||||
Version: s.info.Version,
|
||||
Host: opts.Cluster.Host,
|
||||
Port: opts.Cluster.Port,
|
||||
AuthRequired: false,
|
||||
TLSRequired: tlsReq,
|
||||
SSLRequired: tlsReq,
|
||||
TLSVerify: tlsReq,
|
||||
MaxPayload: s.info.MaxPayload,
|
||||
ClientConnectURLs: clientConnectURLs,
|
||||
}
|
||||
// Check for Auth items
|
||||
if opts.Cluster.Username != "" {
|
||||
info.AuthRequired = true
|
||||
}
|
||||
s.routeInfo = info
|
||||
b, _ := json.Marshal(info)
|
||||
s.routeInfoJSON = []byte(fmt.Sprintf(InfoProto, b))
|
||||
|
||||
// Spin up the accept loop
|
||||
ch := make(chan struct{})
|
||||
go s.routeAcceptLoop(ch)
|
||||
|
||||
@@ -172,9 +172,9 @@ func checkClusterFormed(t *testing.T, servers ...*Server) {
|
||||
// Helper function to generate next opts to make sure no port conflicts etc.
|
||||
func nextServerOpts(opts *Options) *Options {
|
||||
nopts := *opts
|
||||
nopts.Port++
|
||||
nopts.Cluster.Port++
|
||||
nopts.HTTPPort++
|
||||
nopts.Port = -1
|
||||
nopts.Cluster.Port = -1
|
||||
nopts.HTTPPort = -1
|
||||
return &nopts
|
||||
}
|
||||
|
||||
@@ -187,12 +187,13 @@ func TestSeedSolicitWorks(t *testing.T) {
|
||||
defer srvSeed.Shutdown()
|
||||
|
||||
optsA := nextServerOpts(optsSeed)
|
||||
optsA.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsSeed.Cluster.Host, optsSeed.Cluster.Port))
|
||||
optsA.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsSeed.Cluster.Host,
|
||||
srvSeed.ClusterAddr().(*net.TCPAddr).Port))
|
||||
|
||||
srvA := RunServer(optsA)
|
||||
defer srvA.Shutdown()
|
||||
|
||||
urlA := fmt.Sprintf("nats://%s:%d/", optsA.Host, optsA.Port)
|
||||
urlA := fmt.Sprintf("nats://%s:%d/", optsA.Host, srvA.ClusterAddr().(*net.TCPAddr).Port)
|
||||
|
||||
nc1, err := nats.Connect(urlA)
|
||||
if err != nil {
|
||||
@@ -206,12 +207,13 @@ func TestSeedSolicitWorks(t *testing.T) {
|
||||
nc1.Flush()
|
||||
|
||||
optsB := nextServerOpts(optsA)
|
||||
optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsSeed.Cluster.Host, optsSeed.Cluster.Port))
|
||||
optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsSeed.Cluster.Host,
|
||||
srvSeed.ClusterAddr().(*net.TCPAddr).Port))
|
||||
|
||||
srvB := RunServer(optsB)
|
||||
defer srvB.Shutdown()
|
||||
|
||||
urlB := fmt.Sprintf("nats://%s:%d/", optsB.Host, optsB.Port)
|
||||
urlB := fmt.Sprintf("nats://%s:%d/", optsB.Host, srvB.ClusterAddr().(*net.TCPAddr).Port)
|
||||
|
||||
nc2, err := nats.Connect(urlB)
|
||||
if err != nil {
|
||||
@@ -239,13 +241,15 @@ func TestTLSSeedSolicitWorks(t *testing.T) {
|
||||
srvSeed := RunServer(optsSeed)
|
||||
defer srvSeed.Shutdown()
|
||||
|
||||
seedRouteUrl := fmt.Sprintf("nats://%s:%d", optsSeed.Cluster.Host,
|
||||
srvSeed.ClusterAddr().(*net.TCPAddr).Port)
|
||||
optsA := nextServerOpts(optsSeed)
|
||||
optsA.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsSeed.Cluster.Host, optsSeed.Cluster.Port))
|
||||
optsA.Routes = RoutesFromStr(seedRouteUrl)
|
||||
|
||||
srvA := RunServer(optsA)
|
||||
defer srvA.Shutdown()
|
||||
|
||||
urlA := fmt.Sprintf("nats://%s:%d/", optsA.Host, optsA.Port)
|
||||
urlA := fmt.Sprintf("nats://%s:%d/", optsA.Host, srvA.Addr().(*net.TCPAddr).Port)
|
||||
|
||||
nc1, err := nats.Connect(urlA)
|
||||
if err != nil {
|
||||
@@ -259,12 +263,12 @@ func TestTLSSeedSolicitWorks(t *testing.T) {
|
||||
nc1.Flush()
|
||||
|
||||
optsB := nextServerOpts(optsA)
|
||||
optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsSeed.Cluster.Host, optsSeed.Cluster.Port))
|
||||
optsB.Routes = RoutesFromStr(seedRouteUrl)
|
||||
|
||||
srvB := RunServer(optsB)
|
||||
defer srvB.Shutdown()
|
||||
|
||||
urlB := fmt.Sprintf("nats://%s:%d/", optsB.Host, optsB.Port)
|
||||
urlB := fmt.Sprintf("nats://%s:%d/", optsB.Host, srvB.Addr().(*net.TCPAddr).Port)
|
||||
|
||||
nc2, err := nats.Connect(urlB)
|
||||
if err != nil {
|
||||
@@ -292,13 +296,15 @@ func TestChainedSolicitWorks(t *testing.T) {
|
||||
srvSeed := RunServer(optsSeed)
|
||||
defer srvSeed.Shutdown()
|
||||
|
||||
seedRouteUrl := fmt.Sprintf("nats://%s:%d", optsSeed.Cluster.Host,
|
||||
srvSeed.ClusterAddr().(*net.TCPAddr).Port)
|
||||
optsA := nextServerOpts(optsSeed)
|
||||
optsA.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsSeed.Cluster.Host, optsSeed.Cluster.Port))
|
||||
optsA.Routes = RoutesFromStr(seedRouteUrl)
|
||||
|
||||
srvA := RunServer(optsA)
|
||||
defer srvA.Shutdown()
|
||||
|
||||
urlSeed := fmt.Sprintf("nats://%s:%d/", optsSeed.Host, optsSeed.Port)
|
||||
urlSeed := fmt.Sprintf("nats://%s:%d/", optsSeed.Host, srvA.Addr().(*net.TCPAddr).Port)
|
||||
|
||||
nc1, err := nats.Connect(urlSeed)
|
||||
if err != nil {
|
||||
@@ -313,12 +319,13 @@ func TestChainedSolicitWorks(t *testing.T) {
|
||||
|
||||
optsB := nextServerOpts(optsA)
|
||||
// Server B connects to A
|
||||
optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsA.Cluster.Host, optsA.Cluster.Port))
|
||||
optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsA.Cluster.Host,
|
||||
srvA.ClusterAddr().(*net.TCPAddr).Port))
|
||||
|
||||
srvB := RunServer(optsB)
|
||||
defer srvB.Shutdown()
|
||||
|
||||
urlB := fmt.Sprintf("nats://%s:%d/", optsB.Host, optsB.Port)
|
||||
urlB := fmt.Sprintf("nats://%s:%d/", optsB.Host, srvB.Addr().(*net.TCPAddr).Port)
|
||||
|
||||
nc2, err := nats.Connect(urlB)
|
||||
if err != nil {
|
||||
@@ -346,13 +353,15 @@ func TestTLSChainedSolicitWorks(t *testing.T) {
|
||||
srvSeed := RunServer(optsSeed)
|
||||
defer srvSeed.Shutdown()
|
||||
|
||||
urlSeedRoute := fmt.Sprintf("nats://%s:%d", optsSeed.Cluster.Host,
|
||||
srvSeed.ClusterAddr().(*net.TCPAddr).Port)
|
||||
optsA := nextServerOpts(optsSeed)
|
||||
optsA.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsSeed.Cluster.Host, optsSeed.Cluster.Port))
|
||||
optsA.Routes = RoutesFromStr(urlSeedRoute)
|
||||
|
||||
srvA := RunServer(optsA)
|
||||
defer srvA.Shutdown()
|
||||
|
||||
urlSeed := fmt.Sprintf("nats://%s:%d/", optsSeed.Host, optsSeed.Port)
|
||||
urlSeed := fmt.Sprintf("nats://%s:%d/", optsSeed.Host, srvSeed.Addr().(*net.TCPAddr).Port)
|
||||
|
||||
nc1, err := nats.Connect(urlSeed)
|
||||
if err != nil {
|
||||
@@ -367,12 +376,13 @@ func TestTLSChainedSolicitWorks(t *testing.T) {
|
||||
|
||||
optsB := nextServerOpts(optsA)
|
||||
// Server B connects to A
|
||||
optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsA.Cluster.Host, optsA.Cluster.Port))
|
||||
optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsA.Cluster.Host,
|
||||
srvA.ClusterAddr().(*net.TCPAddr).Port))
|
||||
|
||||
srvB := RunServer(optsB)
|
||||
defer srvB.Shutdown()
|
||||
|
||||
urlB := fmt.Sprintf("nats://%s:%d/", optsB.Host, optsB.Port)
|
||||
urlB := fmt.Sprintf("nats://%s:%d/", optsB.Host, srvB.Addr().(*net.TCPAddr).Port)
|
||||
|
||||
nc2, err := nats.Connect(urlB)
|
||||
if err != nil {
|
||||
@@ -397,10 +407,10 @@ func TestRouteTLSHandshakeError(t *testing.T) {
|
||||
srvSeed := RunServer(optsSeed)
|
||||
defer srvSeed.Shutdown()
|
||||
|
||||
opts := DefaultOptions
|
||||
opts := DefaultOptions()
|
||||
opts.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsSeed.Cluster.Host, optsSeed.Cluster.Port))
|
||||
|
||||
srv := RunServer(&opts)
|
||||
srv := RunServer(opts)
|
||||
defer srv.Shutdown()
|
||||
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
@@ -419,11 +429,11 @@ func TestRouteTLSHandshakeError(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestBlockedShutdownOnRouteAcceptLoopFailure(t *testing.T) {
|
||||
opts := DefaultOptions
|
||||
opts := DefaultOptions()
|
||||
opts.Cluster.Host = "x.x.x.x"
|
||||
opts.Cluster.Port = 7222
|
||||
|
||||
s := New(&opts)
|
||||
s := New(opts)
|
||||
go s.Start()
|
||||
// Wait a second
|
||||
time.Sleep(time.Second)
|
||||
@@ -443,7 +453,7 @@ func TestBlockedShutdownOnRouteAcceptLoopFailure(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRouteUseIPv6(t *testing.T) {
|
||||
opts := DefaultOptions
|
||||
opts := DefaultOptions()
|
||||
opts.Cluster.Host = "::"
|
||||
opts.Cluster.Port = 6222
|
||||
|
||||
@@ -456,7 +466,7 @@ func TestRouteUseIPv6(t *testing.T) {
|
||||
t.Skipf("Skipping this test since there is no IPv6 support on this host: %v", err)
|
||||
}
|
||||
|
||||
s := RunServer(&opts)
|
||||
s := RunServer(opts)
|
||||
defer s.Shutdown()
|
||||
|
||||
routeUp := false
|
||||
@@ -484,16 +494,18 @@ func TestRouteUseIPv6(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestClientConnectToRoutePort(t *testing.T) {
|
||||
opts := DefaultOptions
|
||||
opts := DefaultOptions()
|
||||
|
||||
// Since client will first connect to the route listen port, set the
|
||||
// cluster's Host to localhost so it works on Windows too, since on
|
||||
// Windows, a client can't use 0.0.0.0 in a connect.
|
||||
opts.Cluster.Host = "localhost"
|
||||
opts.Cluster.NoAdvertise = true
|
||||
s := RunServer(&opts)
|
||||
defer s.Shutdown()
|
||||
s := RunServer(opts)
|
||||
s.Noticef("%+v\n", opts)
|
||||
// defer s.Shutdown()
|
||||
|
||||
url := fmt.Sprintf("nats://%s:%d", opts.Cluster.Host, opts.Cluster.Port)
|
||||
url := fmt.Sprintf("nats://%s:%d", opts.Cluster.Host, s.ClusterAddr().(*net.TCPAddr).Port)
|
||||
clientURL := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
||||
// When connecting to the ROUTE port, the client library will receive the
|
||||
// CLIENT port in the INFO protocol. This URL is added to the client's pool
|
||||
@@ -504,6 +516,7 @@ func TestClientConnectToRoutePort(t *testing.T) {
|
||||
// attempts rather small.
|
||||
total := 10
|
||||
for i := 0; i < total; i++ {
|
||||
s.Noticef("URL: %s", url)
|
||||
nc, err := nats.Connect(url)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexepected error on connect: %v", err)
|
||||
|
||||
@@ -68,6 +68,7 @@ type Server struct {
|
||||
start time.Time
|
||||
http net.Listener
|
||||
httpHandler http.Handler
|
||||
profiler net.Listener
|
||||
httpReqStats map[string]uint64
|
||||
routeListener net.Listener
|
||||
routeInfo Info
|
||||
@@ -78,6 +79,7 @@ type Server struct {
|
||||
grRunning bool
|
||||
grWG sync.WaitGroup // to wait on various go routines
|
||||
cproto int64 // number of clients supporting async INFO
|
||||
fatalError string // Captures the error string for any fatal error
|
||||
logging struct {
|
||||
sync.RWMutex
|
||||
logger Logger
|
||||
@@ -329,6 +331,12 @@ func (s *Server) Shutdown() {
|
||||
s.http = nil
|
||||
}
|
||||
|
||||
// Kick Profiling if its running
|
||||
if s.profiler != nil {
|
||||
doneExpected++
|
||||
s.profiler.Close()
|
||||
}
|
||||
|
||||
// Release the solicited routes connect go routines.
|
||||
close(s.rcQuit)
|
||||
|
||||
@@ -363,12 +371,12 @@ func (s *Server) AcceptLoop(clr chan struct{}) {
|
||||
opts := s.getOpts()
|
||||
|
||||
hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
|
||||
s.Noticef("Listening for client connections on %s", hp)
|
||||
l, e := net.Listen("tcp", hp)
|
||||
if e != nil {
|
||||
s.Fatalf("Error listening on port: %s, %q", hp, e)
|
||||
return
|
||||
}
|
||||
s.Noticef("Listening for client connections on %s", l.Addr().String())
|
||||
|
||||
// Alert of TLS enabled.
|
||||
if opts.TLSConfig != nil {
|
||||
@@ -436,16 +444,43 @@ func (s *Server) AcceptLoop(clr chan struct{}) {
|
||||
|
||||
// StartProfiler is called to enable dynamic profiling.
|
||||
func (s *Server) StartProfiler() {
|
||||
s.Noticef("in StartProfiler %s", "HOW!?!?")
|
||||
// Snapshot server options.
|
||||
opts := s.getOpts()
|
||||
s.Noticef(opts.HTTPHost)
|
||||
|
||||
s.Noticef("Starting profiling on http port %d", opts.ProfPort)
|
||||
hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.ProfPort))
|
||||
go func() {
|
||||
err := http.ListenAndServe(hp, nil)
|
||||
if err != nil {
|
||||
s.Fatalf("error starting monitor server: %s", err)
|
||||
port := opts.ProfPort
|
||||
|
||||
// Check for Random Port
|
||||
if port == -1 {
|
||||
port = 0
|
||||
}
|
||||
|
||||
hp := net.JoinHostPort(opts.Host, strconv.Itoa(port))
|
||||
|
||||
l, err := net.Listen("tcp", hp)
|
||||
s.Noticef("profiling port: %d", l.Addr().(*net.TCPAddr).Port)
|
||||
|
||||
if err != nil {
|
||||
s.Fatalf("error starting profiler: %s", err)
|
||||
}
|
||||
|
||||
srv := &http.Server{
|
||||
Addr: hp,
|
||||
Handler: http.DefaultServeMux,
|
||||
ReadTimeout: 2 * time.Second,
|
||||
WriteTimeout: 2 * time.Second,
|
||||
MaxHeaderBytes: 1 << 20,
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
s.profiler = l
|
||||
s.mu.Unlock()
|
||||
|
||||
go func() {
|
||||
// if this errors out, it's probably because the server is being shutdown
|
||||
srv.Serve(l)
|
||||
s.done <- true
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -889,6 +924,16 @@ func (s *Server) MonitorAddr() net.Addr {
|
||||
return s.http.Addr()
|
||||
}
|
||||
|
||||
// RouteAddr returns the net.Addr object for the route listener.
|
||||
func (s *Server) ClusterAddr() net.Addr {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.routeListener == nil {
|
||||
return nil
|
||||
}
|
||||
return s.routeListener.Addr()
|
||||
}
|
||||
|
||||
// ReadyForConnections returns `true` if the server is ready to accept client
|
||||
// and, if routing is enabled, route connections. If after the duration
|
||||
// `dur` the server is still not ready, returns `false`.
|
||||
@@ -977,3 +1022,8 @@ func (s *Server) getClientConnectURLs() []string {
|
||||
}
|
||||
return urls
|
||||
}
|
||||
|
||||
// Return startup error, if there is one. Useful for unit testing.
|
||||
func (s *Server) FatalError() string {
|
||||
return s.fatalError
|
||||
}
|
||||
|
||||
@@ -13,26 +13,35 @@ import (
|
||||
"github.com/nats-io/go-nats"
|
||||
)
|
||||
|
||||
var DefaultOptions = Options{
|
||||
func DefaultOptions() *Options {
|
||||
return &Options{
|
||||
Host: "localhost",
|
||||
Port: 11222,
|
||||
HTTPPort: 11333,
|
||||
Cluster: ClusterOpts{Port: 11444},
|
||||
ProfPort: 11280,
|
||||
Port: -1,
|
||||
HTTPPort: -1,
|
||||
Cluster: ClusterOpts{Port: -1},
|
||||
ProfPort: -1,
|
||||
NoLog: true,
|
||||
NoSigs: true,
|
||||
Debug: true,
|
||||
Trace: true,
|
||||
}
|
||||
}
|
||||
|
||||
// New Go Routine based server
|
||||
func RunServer(opts *Options) *Server {
|
||||
if opts == nil {
|
||||
opts = &DefaultOptions
|
||||
opts = DefaultOptions()
|
||||
}
|
||||
s := New(opts)
|
||||
|
||||
if s == nil {
|
||||
panic("No NATS Server object returned.")
|
||||
}
|
||||
|
||||
if !opts.NoLog {
|
||||
s.ConfigureLogger()
|
||||
}
|
||||
|
||||
// Run server in Go routine.
|
||||
go s.Start()
|
||||
|
||||
@@ -43,8 +52,19 @@ func RunServer(opts *Options) *Server {
|
||||
return s
|
||||
}
|
||||
|
||||
func TestStartProfiler(t *testing.T) {
|
||||
s := New(DefaultOptions())
|
||||
s.StartProfiler()
|
||||
s.mu.Lock()
|
||||
s.profiler.Close()
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func TestStartupAndShutdown(t *testing.T) {
|
||||
s := RunServer(&DefaultOptions)
|
||||
|
||||
opts := DefaultOptions()
|
||||
|
||||
s := RunServer(opts)
|
||||
defer s.Shutdown()
|
||||
|
||||
if !s.isRunning() {
|
||||
@@ -125,13 +145,13 @@ func TestTlsCipher(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestGetConnectURLs(t *testing.T) {
|
||||
opts := DefaultOptions
|
||||
opts := DefaultOptions()
|
||||
opts.Port = 4222
|
||||
|
||||
var globalIP net.IP
|
||||
|
||||
checkGlobalConnectURLs := func() {
|
||||
s := New(&opts)
|
||||
s := New(opts)
|
||||
defer s.Shutdown()
|
||||
|
||||
urls := s.getClientConnectURLs()
|
||||
@@ -167,7 +187,7 @@ func TestGetConnectURLs(t *testing.T) {
|
||||
}
|
||||
|
||||
checkConnectURLsHasOnlyOne := func() {
|
||||
s := New(&opts)
|
||||
s := New(opts)
|
||||
defer s.Shutdown()
|
||||
|
||||
urls := s.getClientConnectURLs()
|
||||
@@ -195,24 +215,27 @@ func TestGetConnectURLs(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNoDeadlockOnStartFailure(t *testing.T) {
|
||||
opts := DefaultOptions
|
||||
opts := DefaultOptions()
|
||||
opts.Host = "x.x.x.x" // bad host
|
||||
opts.Port = 4222
|
||||
opts.HTTPHost = opts.Host
|
||||
opts.Cluster.Host = "localhost"
|
||||
opts.Cluster.Port = 6222
|
||||
opts.Cluster.Port = -1
|
||||
opts.ProfPort = -1
|
||||
s := New(opts)
|
||||
|
||||
s := New(&opts)
|
||||
// This should return since it should fail to start a listener
|
||||
// on x.x.x.x:4222
|
||||
s.Start()
|
||||
|
||||
// We should be able to shutdown
|
||||
s.Shutdown()
|
||||
}
|
||||
|
||||
func TestMaxConnections(t *testing.T) {
|
||||
opts := DefaultOptions
|
||||
opts := DefaultOptions()
|
||||
opts.MaxConn = 1
|
||||
s := RunServer(&opts)
|
||||
s := RunServer(opts)
|
||||
defer s.Shutdown()
|
||||
|
||||
addr := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
||||
@@ -277,10 +300,9 @@ func TestProcessCommandLineArgs(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestWriteDeadline(t *testing.T) {
|
||||
opts := DefaultOptions
|
||||
opts := DefaultOptions()
|
||||
opts.WriteDeadline = 20 * time.Millisecond
|
||||
opts.NoLog = false
|
||||
s := RunServer(&opts)
|
||||
s := RunServer(opts)
|
||||
defer s.Shutdown()
|
||||
|
||||
c, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", opts.Host, opts.Port), 3*time.Second)
|
||||
@@ -331,10 +353,10 @@ func TestWriteDeadline(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRandomPorts(t *testing.T) {
|
||||
opts := DefaultOptions
|
||||
opts := DefaultOptions()
|
||||
opts.HTTPPort = -1
|
||||
opts.Port = -1
|
||||
s := RunServer(&opts)
|
||||
s := RunServer(opts)
|
||||
|
||||
defer s.Shutdown()
|
||||
|
||||
@@ -353,10 +375,10 @@ func TestRandomPorts(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNilMonitoringPort(t *testing.T) {
|
||||
opts := DefaultOptions
|
||||
opts := DefaultOptions()
|
||||
opts.HTTPPort = 0
|
||||
opts.HTTPSPort = 0
|
||||
s := RunServer(&opts)
|
||||
s := RunServer(opts)
|
||||
|
||||
defer s.Shutdown()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user