Major updates + support for config reload of client/cluster advertise

This commit is contained in:
Ivan Kozlovic
2018-02-05 20:15:36 -07:00
parent 306a3f9507
commit acf4a31e4b
13 changed files with 547 additions and 105 deletions

View File

@@ -152,6 +152,7 @@ Server Options:
-ms,--https_port <port> Use port for https monitoring
-c, --config <file> Configuration file
-sl,--signal <signal>[=<pid>] Send signal to gnatsd process (stop, quit, reopen, reload)
--client_advertise <string> Client URL to advertise to other servers
Logging Options:
-l, --log <file> File to redirect log output
@@ -178,6 +179,7 @@ Cluster Options:
--routes <rurl-1, rurl-2> Routes to solicit and connect
--cluster <cluster-url> Cluster URL for solicited routes
--no_advertise <bool> Advertise known cluster IPs to clients
--cluster_advertise <string> Cluster URL to advertise to other servers
--connect_retries <number> For implicit routes, number of connect retries
@@ -286,6 +288,23 @@ The `--routes` flag specifies the NATS URL for one or more servers in the cluste
Previous releases required you to build the complete mesh using the `--routes` flag. To define your cluster in the current release, please follow the "Basic example" as described below.
Suppose that server srvA is connected to server srvB. A bi-directional route exists between srvA and srvB. A new server, srvC, connects to srvA.<br>
When accepting the connection, srvA will gossip the address of srvC to srvB so that srvB connects to srvC, completing the full mesh.<br>
The URL that srvB will use to connect to srvC is the result of the TCP remote address that srvA got from its connection to srvC.
It is possible to advertise with `--cluster_advertise` a different address than the one used in `--cluster`.
In the previous example, if srvC uses a `--cluster_adertise` URL, this is what srvA will gossip to srvB in order to connect to srvC.
NOTE: The advertise address should really result in a connection to srvC. Providing an address that would result in a connection to a different NATS Server would prevent the formation of a full-mesh cluster!
As part of the gossip protocol, a server will also send to the other servers the URL clients should connect to.<br>
The URL is the one defined in the `listen` parameter, or, if 0.0.0.0 or :: is specified, the resolved non-local IP addresses for the "any" interface.
If those addresses are not reacheable from the outside world where the clients are running, the administrator can use the `--no_advertise` option to disable servers gossiping those URLs.<br>
Another option is to provide a `--client_advertise` URL to use instead. If this option is specified (and advertise has not been disabled), then the server will advertise this URL to other servers instead of its `listen` address (or resolved IPs when listen is 0.0.0.0 or ::).
### Basic example
NATS makes building the full mesh easy. Simply designate a server to be a *seed* server. All other servers in the cluster simply specify the *seed* server as its server's routes option as indicated below.

View File

@@ -21,6 +21,7 @@ Server Options:
-ms,--https_port <port> Use port for https monitoring
-c, --config <file> Configuration file
-sl,--signal <signal>[=<pid>] Send signal to gnatsd process (stop, quit, reopen, reload)
--client_advertise <string> Client URL to advertise to other servers
Logging Options:
-l, --log <file> File to redirect log output
@@ -47,6 +48,7 @@ Cluster Options:
--routes <rurl-1, rurl-2> Routes to solicit and connect
--cluster <cluster-url> Cluster URL for solicited routes
--no_advertise <bool> Advertise known cluster IPs to clients
--cluster_advertise <string> Cluster URL to advertise to other servers
--connect_retries <number> For implicit routes, number of connect retries

View File

@@ -8,6 +8,7 @@ import (
"os"
"runtime"
"strings"
"sync"
"testing"
"github.com/nats-io/gnatsd/logger"
@@ -64,28 +65,41 @@ func TestSetLogger(t *testing.T) {
}
type DummyLogger struct {
sync.Mutex
msg string
}
func (dl *DummyLogger) checkContent(t *testing.T, expectedStr string) {
if dl.msg != expectedStr {
stackFatalf(t, "Expected log to be: %v, got %v", expectedStr, dl.msg)
func (l *DummyLogger) checkContent(t *testing.T, expectedStr string) {
l.Lock()
defer l.Unlock()
if l.msg != expectedStr {
stackFatalf(t, "Expected log to be: %v, got %v", expectedStr, l.msg)
}
}
func (l *DummyLogger) Noticef(format string, v ...interface{}) {
l.Lock()
defer l.Unlock()
l.msg = fmt.Sprintf(format, v...)
}
func (l *DummyLogger) Errorf(format string, v ...interface{}) {
l.Lock()
defer l.Unlock()
l.msg = fmt.Sprintf(format, v...)
}
func (l *DummyLogger) Fatalf(format string, v ...interface{}) {
l.Lock()
defer l.Unlock()
l.msg = fmt.Sprintf(format, v...)
}
func (l *DummyLogger) Debugf(format string, v ...interface{}) {
l.Lock()
defer l.Unlock()
l.msg = fmt.Sprintf(format, v...)
}
func (l *DummyLogger) Tracef(format string, v ...interface{}) {
l.Lock()
defer l.Unlock()
l.msg = fmt.Sprintf(format, v...)
}

View File

@@ -30,7 +30,7 @@ type ClusterOpts struct {
TLSTimeout float64 `json:"-"`
TLSConfig *tls.Config `json:"-"`
ListenStr string `json:"-"`
RouteAdvertise string `json:"-"`
Advertise string `json:"-"`
NoAdvertise bool `json:"-"`
ConnectRetries int `json:"-"`
}
@@ -391,8 +391,8 @@ func parseCluster(cm map[string]interface{}, opts *Options) error {
opts.Cluster.TLSConfig.ClientAuth = tls.RequireAndVerifyClientCert
opts.Cluster.TLSConfig.RootCAs = opts.Cluster.TLSConfig.ClientCAs
opts.Cluster.TLSTimeout = tc.Timeout
case "route_advertise":
opts.Cluster.RouteAdvertise = mv.(string)
case "cluster_advertise", "advertise":
opts.Cluster.Advertise = mv.(string)
case "no_advertise":
opts.Cluster.NoAdvertise = mv.(bool)
case "connect_retries":
@@ -768,6 +768,9 @@ func MergeOptions(fileOpts, flagOpts *Options) *Options {
if flagOpts.Cluster.ConnectRetries != 0 {
opts.Cluster.ConnectRetries = flagOpts.Cluster.ConnectRetries
}
if flagOpts.Cluster.Advertise != "" {
opts.Cluster.Advertise = flagOpts.Cluster.Advertise
}
if flagOpts.RoutesStr != "" {
mergeRoutes(&opts, flagOpts)
}
@@ -951,7 +954,7 @@ func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp,
fs.StringVar(&opts.Host, "addr", "", "Network host to listen on.")
fs.StringVar(&opts.Host, "a", "", "Network host to listen on.")
fs.StringVar(&opts.Host, "net", "", "Network host to listen on.")
fs.StringVar(&opts.ClientAdvertise, "client_advertise", "", "Client url for discovered servers.")
fs.StringVar(&opts.ClientAdvertise, "client_advertise", "", "Client URL to advertise to other servers.")
fs.BoolVar(&opts.Debug, "D", false, "Enable Debug logging.")
fs.BoolVar(&opts.Debug, "debug", false, "Enable Debug logging.")
fs.BoolVar(&opts.Trace, "V", false, "Enable Trace logging.")
@@ -984,7 +987,7 @@ func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp,
fs.StringVar(&opts.RoutesStr, "routes", "", "Routes to actively solicit a connection.")
fs.StringVar(&opts.Cluster.ListenStr, "cluster", "", "Cluster url from which members can solicit routes.")
fs.StringVar(&opts.Cluster.ListenStr, "cluster_listen", "", "Cluster url from which members can solicit routes.")
fs.StringVar(&opts.Cluster.RouteAdvertise, "route_advertise", "", "Cluster url(s) for discovered servers.")
fs.StringVar(&opts.Cluster.Advertise, "cluster_advertise", "", "Cluster URL to advertise to other servers.")
fs.BoolVar(&opts.Cluster.NoAdvertise, "no_advertise", false, "Advertise known cluster IPs to clients.")
fs.IntVar(&opts.Cluster.ConnectRetries, "connect_retries", 0, "For implicit routes, number of connect retries")
fs.BoolVar(&showTLSHelp, "help_tls", false, "TLS help.")

View File

@@ -237,7 +237,7 @@ func (c *clusterOption) Apply(server *Server) {
server.routeInfo.SSLRequired = tlsRequired
server.routeInfo.TLSVerify = tlsRequired
server.routeInfo.AuthRequired = c.newValue.Username != ""
server.generateRouteInfoJSON()
server.setRouteInfoHostPortAndIP()
server.mu.Unlock()
server.Noticef("Reloaded: cluster")
}
@@ -407,6 +407,20 @@ func (w *writeDeadlineOption) Apply(server *Server) {
server.Noticef("Reloaded: write_deadline = %s", w.newValue)
}
// clientAdvertiseOption implements the option interface for the `client_advertise` setting.
type clientAdvertiseOption struct {
noopOption
newValue string
}
// Apply the setting by updating the server info and regenerate the infoJSON byte array.
func (c *clientAdvertiseOption) Apply(server *Server) {
server.mu.Lock()
server.setInfoHostPortAndGenerateJSON()
server.mu.Unlock()
server.Noticef("Reload: client_advertise = %s", c.newValue)
}
// Reload reads the current configuration file and applies any supported
// changes. This returns an error if the server was not started with a config
// file or an option which doesn't support hot-swapping was changed.
@@ -422,11 +436,25 @@ func (s *Server) Reload() error {
// TODO: Dump previous good config to a .bak file?
return err
}
clientOrgPort := s.clientActualPort
clusterOrgPort := s.clusterActualPort
s.mu.Unlock()
// Apply flags over config file settings.
newOpts = MergeOptions(newOpts, FlagSnapshot)
processOptions(newOpts)
// processOptions sets Port to 0 if set to -1 (RANDOM port)
// If that's the case, set it to the saved value when the accept loop was
// created.
if newOpts.Port == 0 {
newOpts.Port = clientOrgPort
}
// We don't do that for cluster, so check against -1.
if newOpts.Cluster.Port == -1 {
newOpts.Cluster.Port = clusterOrgPort
}
if err := s.reloadOptions(newOpts); err != nil {
return err
}
@@ -518,6 +546,15 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) {
diffOpts = append(diffOpts, &maxPingsOutOption{newValue: newValue.(int)})
case "writedeadline":
diffOpts = append(diffOpts, &writeDeadlineOption{newValue: newValue.(time.Duration)})
case "clientadvertise":
cliAdv := newValue.(string)
if cliAdv != "" {
// Validate ClientAdvertise syntax
if _, _, err := parseHostPort(cliAdv, 0); err != nil {
return nil, fmt.Errorf("invalid ClientAdvertise value of %s, err=%v", cliAdv, err)
}
}
diffOpts = append(diffOpts, &clientAdvertiseOption{newValue: cliAdv})
case "nolog":
// Ignore NoLog option since it's not parsed and only used in
// testing.
@@ -612,6 +649,12 @@ func validateClusterOpts(old, new ClusterOpts) error {
return fmt.Errorf("Config reload not supported for cluster port: old=%d, new=%d",
old.Port, new.Port)
}
// Validate Cluster.Advertise syntax
if new.Advertise != "" {
if _, _, err := parseHostPort(new.Advertise, 0); err != nil {
return fmt.Errorf("invalid Cluster.Advertise value of %s, err=%v", new.Advertise, err)
}
}
return nil
}

View File

@@ -3,6 +3,7 @@
package server
import (
"encoding/json"
"fmt"
"io/ioutil"
"net"
@@ -187,7 +188,7 @@ func TestConfigReload(t *testing.T) {
if err := ioutil.WriteFile(platformConf, content, 0666); err != nil {
t.Fatalf("Unable to write config file: %v", err)
}
server, opts, config := newServerWithSymlinkConfig(t, "tmp.conf", "./configs/reload/test.conf")
server, opts, config := runServerWithSymlinkConfig(t, "tmp.conf", "./configs/reload/test.conf")
defer os.Remove(config)
defer server.Shutdown()
@@ -200,6 +201,7 @@ func TestConfigReload(t *testing.T) {
AuthTimeout: 1.0,
Debug: false,
Trace: false,
NoLog: true,
Logtime: false,
MaxControlLine: 1024,
MaxPayload: 1048576,
@@ -209,12 +211,12 @@ func TestConfigReload(t *testing.T) {
WriteDeadline: 2 * time.Second,
Cluster: ClusterOpts{
Host: "localhost",
Port: -1,
Port: server.ClusterAddr().Port,
},
}
processOptions(golden)
if !reflect.DeepEqual(golden, server.getOpts()) {
if !reflect.DeepEqual(golden, opts) {
t.Fatalf("Options are incorrect.\nexpected: %+v\ngot: %+v",
golden, opts)
}
@@ -1372,6 +1374,166 @@ func TestConfigReloadClusterRoutes(t *testing.T) {
}
}
func TestConfigReloadClusterAdvertise(t *testing.T) {
conf := "routeadv.conf"
if err := ioutil.WriteFile(conf, []byte(`
listen: "0.0.0.0:-1"
cluster: {
listen: "0.0.0.0:-1"
}
`), 0666); err != nil {
t.Fatalf("Error creating config file: %v", err)
}
defer os.Remove(conf)
opts, err := ProcessConfigFile(conf)
if err != nil {
t.Fatalf("Error processing config file: %v", err)
}
opts.NoLog = true
s := RunServer(opts)
defer s.Shutdown()
orgClusterPort := s.ClusterAddr().Port
updateConfig := func(content string) {
if err := ioutil.WriteFile(conf, []byte(content), 0666); err != nil {
stackFatalf(t, "Error creating config file: %v", err)
}
if err := s.Reload(); err != nil {
stackFatalf(t, "Error on reload: %v", err)
}
}
verify := func(expectedHost string, expectedPort int, expectedIP string) {
s.mu.Lock()
routeInfo := s.routeInfo
routeInfoJSON := Info{}
err = json.Unmarshal(s.routeInfoJSON[5:], &routeInfoJSON) // Skip "INFO "
s.mu.Unlock()
if err != nil {
t.Fatalf("Error on Unmarshal: %v", err)
}
if routeInfo.Host != expectedHost || routeInfo.Port != expectedPort || routeInfo.IP != expectedIP {
t.Fatalf("Expected host/port/IP to be %s:%v, %q, got %s:%d, %q",
expectedHost, expectedPort, expectedIP, routeInfo.Host, routeInfo.Port, routeInfo.IP)
}
// Check that server routeInfoJSON was updated too
if !reflect.DeepEqual(routeInfo, routeInfoJSON) {
t.Fatalf("Expected routeInfoJSON to be %+v, got %+v", routeInfo, routeInfoJSON)
}
}
// Update config with cluster_advertise
updateConfig(`
listen: "0.0.0.0:-1"
cluster: {
listen: "0.0.0.0:-1"
cluster_advertise: "me:1"
}
`)
verify("me", 1, "nats-route://me:1/")
// Update config with cluster_advertise (no port specified)
updateConfig(`
listen: "0.0.0.0:-1"
cluster: {
listen: "0.0.0.0:-1"
cluster_advertise: "me"
}
`)
verify("me", orgClusterPort, fmt.Sprintf("nats-route://me:%d/", orgClusterPort))
// Update config with cluster_advertise (-1 port specified)
updateConfig(`
listen: "0.0.0.0:-1"
cluster: {
listen: "0.0.0.0:-1"
cluster_advertise: "me:-1"
}
`)
verify("me", orgClusterPort, fmt.Sprintf("nats-route://me:%d/", orgClusterPort))
// Update to remove cluster_advertise
updateConfig(`
listen: "0.0.0.0:-1"
cluster: {
listen: "0.0.0.0:-1"
}
`)
verify("0.0.0.0", orgClusterPort, "")
}
func TestConfigReloadClientAdvertise(t *testing.T) {
conf := "clientadv.conf"
if err := ioutil.WriteFile(conf, []byte(`listen: "0.0.0.0:-1"`), 0666); err != nil {
t.Fatalf("Error creating config file: %v", err)
}
defer os.Remove(conf)
opts, err := ProcessConfigFile(conf)
if err != nil {
stackFatalf(t, "Error processing config file: %v", err)
}
opts.NoLog = true
s := RunServer(opts)
defer s.Shutdown()
orgPort := s.Addr().(*net.TCPAddr).Port
updateConfig := func(content string) {
if err := ioutil.WriteFile(conf, []byte(content), 0666); err != nil {
stackFatalf(t, "Error creating config file: %v", err)
}
if err := s.Reload(); err != nil {
stackFatalf(t, "Error on reload: %v", err)
}
}
verify := func(expectedHost string, expectedPort int) {
s.mu.Lock()
info := s.info
infoJSON := Info{clientConnectURLs: make(map[string]struct{})}
err := json.Unmarshal(s.infoJSON[5:len(s.infoJSON)-2], &infoJSON) // Skip INFO
s.mu.Unlock()
if err != nil {
stackFatalf(t, "Error on Unmarshal: %v", err)
}
if info.Host != expectedHost || info.Port != expectedPort {
stackFatalf(t, "Expected host/port to be %s:%d, got %s:%d",
expectedHost, expectedPort, info.Host, info.Port)
}
// Check that server infoJSON was updated too
if !reflect.DeepEqual(info, infoJSON) {
stackFatalf(t, "Expected infoJSON to be %+v, got %+v", info, infoJSON)
}
}
// Update config with ClientAdvertise (port specified)
updateConfig(`
listen: "0.0.0.0:-1"
client_advertise: "me:1"
`)
verify("me", 1)
// Update config with ClientAdvertise (no port specified)
updateConfig(`
listen: "0.0.0.0:-1"
client_advertise: "me"
`)
verify("me", orgPort)
// Update config with ClientAdvertise (-1 port specified)
updateConfig(`
listen: "0.0.0.0:-1"
client_advertise: "me:-1"
`)
verify("me", orgPort)
// Now remove ClientAdvertise to check that original values
// are restored.
updateConfig(`listen: "0.0.0.0:-1"`)
verify("0.0.0.0", orgPort)
}
// Ensure Reload supports changing the max connections. Test this by starting a
// server with no max connections, connecting two clients, reloading with a
// max connections of one, and ensuring one client is disconnected.

View File

@@ -143,7 +143,9 @@ func (c *client) processRouteInfo(info *Info) {
// Send our local subscriptions to this route.
s.sendLocalSubsToRoute(c)
if sendInfo {
// If IP isn't already set on info
// The incoming INFO from the route will have IP set
// if it has Cluster.Advertise. In that case, use that
// otherwise contruct it from the remote TCP address.
if info.IP == "" {
// Need to get the remote IP address.
c.mu.Lock()
@@ -614,6 +616,12 @@ func (s *Server) broadcastUnSubscribe(sub *subscription) {
}
func (s *Server) routeAcceptLoop(ch chan struct{}) {
defer func() {
if ch != nil {
close(ch)
}
}()
// Snapshot server options.
opts := s.getOpts()
@@ -633,34 +641,16 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) {
s.Noticef("Listening for route connections on %s", hp)
l, e := net.Listen("tcp", hp)
if e != nil {
// We need to close this channel to avoid a deadlock
close(ch)
s.Fatalf("Error listening on router port: %d - %v", opts.Cluster.Port, e)
return
}
s.mu.Lock()
// Check for TLSConfig
tlsReq := opts.Cluster.TLSConfig != nil
// Configure Cluster Advertise Address
host := opts.Cluster.Host
port = l.Addr().(*net.TCPAddr).Port
ip := ""
if opts.Cluster.RouteAdvertise != "" {
advHost, advPort, err := parseHostPort(opts.Cluster.RouteAdvertise, port)
if err != nil {
s.Errorf("setting RouteAdvertise failed %v", err)
} else {
host = advHost
port = advPort
}
ip = fmt.Sprintf("nats-route://%s/", net.JoinHostPort(advHost, strconv.Itoa(advPort)))
}
info := Info{
ID: s.info.ID,
Version: s.info.Version,
Host: host,
Port: port,
IP: ip,
AuthRequired: false,
TLSRequired: tlsReq,
SSLRequired: tlsReq,
@@ -668,20 +658,33 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) {
MaxPayload: s.info.MaxPayload,
ClientConnectURLs: clientConnectURLs,
}
// If we have selected a random port...
if port == 0 {
// Write resolved port back to options.
opts.Cluster.Port = l.Addr().(*net.TCPAddr).Port
}
// Keep track of actual listen port. This will be needed in case of
// config reload.
s.clusterActualPort = opts.Cluster.Port
// Check for Auth items
if opts.Cluster.Username != "" {
info.AuthRequired = true
}
s.routeInfo = info
s.generateRouteInfoJSON()
// Possibly override Host/Port and set IP based on Cluster.Advertise
if err := s.setRouteInfoHostPortAndIP(); err != nil {
s.Fatalf("Error setting route INFO with Cluster.Advertise value of %s, err=%v", s.opts.Cluster.Advertise, err)
l.Close()
s.mu.Unlock()
return
}
// Setup state that can enable shutdown
s.mu.Lock()
s.routeListener = l
s.mu.Unlock()
// Let them know we are up
close(ch)
ch = nil
tmpDelay := ACCEPT_MIN_SLEEP
@@ -711,6 +714,26 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) {
s.done <- true
}
// Similar to setInfoHostPortAndGenerateJSON, but for routeInfo.
func (s *Server) setRouteInfoHostPortAndIP() error {
if s.opts.Cluster.Advertise != "" {
advHost, advPort, err := parseHostPort(s.opts.Cluster.Advertise, s.opts.Cluster.Port)
if err != nil {
return err
}
s.routeInfo.Host = advHost
s.routeInfo.Port = advPort
s.routeInfo.IP = fmt.Sprintf("nats-route://%s/", net.JoinHostPort(advHost, strconv.Itoa(advPort)))
} else {
s.routeInfo.Host = s.opts.Cluster.Host
s.routeInfo.Port = s.opts.Cluster.Port
s.routeInfo.IP = ""
}
// (re)generate the routeInfoJSON byte array
s.generateRouteInfoJSON()
return nil
}
// StartRouting will start the accept loop on the cluster host:port
// and will actively try to connect to listed routes.
func (s *Server) StartRouting(clientListenReady chan struct{}) {

View File

@@ -3,8 +3,6 @@
package server
import (
"bufio"
"encoding/json"
"fmt"
"net"
"net/url"
@@ -55,55 +53,127 @@ func TestRouteConfig(t *testing.T) {
}
}
func TestRouteAdvertise(t *testing.T) {
// TODO: Need to work through this test case. may need to add a util proxy server
// to validate functionally.
optsSeed, _ := ProcessConfigFile("./configs/seed.conf")
func TestClusterAdvertise(t *testing.T) {
lst, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error starting listener: %v", err)
}
ch := make(chan error)
go func() {
c, err := lst.Accept()
if err != nil {
ch <- err
return
}
c.Close()
ch <- nil
}()
optsSeed.NoSigs, optsSeed.NoLog = true, true
srvSeed := RunServer(optsSeed)
defer srvSeed.Shutdown()
optsA, _ := ProcessConfigFile("./configs/seed.conf")
optsA.NoSigs, optsA.NoLog = true, true
srvA := RunServer(optsA)
defer srvA.Shutdown()
seedRouteUrl := fmt.Sprintf("nats://%s:%d", optsSeed.Cluster.Host,
srvSeed.ClusterAddr().Port)
optsA := nextServerOpts(optsSeed)
optsA.Routes = RoutesFromStr(seedRouteUrl)
optsA.Cluster.Port = 9999
optsA.Cluster.RouteAdvertise = "example.com:80"
srvARouteURL := fmt.Sprintf("nats://%s:%d", optsA.Cluster.Host, srvA.ClusterAddr().Port)
optsB := nextServerOpts(optsA)
optsB.Routes = RoutesFromStr(srvARouteURL)
srvB := RunServer(optsB)
defer srvB.Shutdown()
// Wait for these 2 to connect to each other
checkClusterFormed(t, srvA, srvB)
// Now start server C that connects to A. A should ask B to connect to C,
// based on C's URL. But since C configures a Cluster.Advertise, it will connect
// to our listener.
optsC := nextServerOpts(optsB)
optsC.Cluster.Advertise = lst.Addr().String()
optsC.ClientAdvertise = "me:1"
optsC.Routes = RoutesFromStr(srvARouteURL)
srvC := RunServer(optsC)
defer srvC.Shutdown()
select {
case e := <-ch:
if e != nil {
t.Fatalf("Error: %v", e)
}
case <-time.After(2 * time.Second):
t.Fatalf("Test timed out")
}
}
func TestClusterAdvertiseErrorOnStartup(t *testing.T) {
opts := DefaultOptions()
// Set invalid address
opts.Cluster.Advertise = "addr:::123"
s := New(opts)
defer s.Shutdown()
dl := &DummyLogger{}
s.SetLogger(dl, false, false)
// Start will keep running, so start in a go-routine.
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
s.Start()
wg.Done()
}()
msg := ""
ok := false
timeout := time.Now().Add(2 * time.Second)
for time.Now().Before(timeout) {
dl.Lock()
msg = dl.msg
dl.Unlock()
if strings.Contains(msg, "Cluster.Advertise") {
ok = true
break
}
}
if !ok {
t.Fatalf("Did not get expected error, got %v", msg)
}
s.Shutdown()
wg.Wait()
}
func TestClientAdvertise(t *testing.T) {
optsA, _ := ProcessConfigFile("./configs/seed.conf")
optsA.NoSigs, optsA.NoLog = true, true
srvA := RunServer(optsA)
defer srvA.Shutdown()
srvA.mu.Lock()
if srvA.routeInfo.Host != "example.com" {
t.Fatalf("Expected srvA Route Advertise to be example.com:80, got: %v:%d",
srvA.routeInfo.Host, srvA.routeInfo.Port)
}
// using example.com, but don't expect anything to try to connect to it.
if srvA.routeInfo.IP != "nats-route://example.com:80/" {
t.Fatalf("Expected srvA.routeInfo.IP to be set, got %v", srvA.routeInfo.IP)
}
srvA.mu.Unlock()
srvSeed.mu.Lock()
if srvSeed.routeInfo.IP != "" {
t.Fatalf("Expected srvSeed.routeInfo.IP to not be set, got %v", srvSeed.routeInfo.IP)
}
srvSeed.mu.Unlock()
optsB := nextServerOpts(optsA)
optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsA.Cluster.Host, optsA.Cluster.Port))
optsB.ClientAdvertise = "me:1"
srvB := RunServer(optsB)
defer srvB.Shutdown()
// create a TCP client, connect to srvA Cluster and verify info.
testCn, _ := net.Dial("tcp", net.JoinHostPort(optsA.Cluster.Host, strconv.Itoa(optsA.Cluster.Port)))
defer testCn.Close()
msg, _ := bufio.NewReader(testCn).ReadString('\n')
var retInfo Info
err := json.Unmarshal([]byte(strings.TrimLeft(msg, "INFO ")), &retInfo)
checkClusterFormed(t, srvA, srvB)
nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port))
if err != nil {
t.Fatalf("Unable to read response: %v", err)
t.Fatalf("Error on connect: %v", err)
}
if retInfo.Host != "example.com" && retInfo.Port != optsA.Cluster.Port {
t.Fatalf("Host and Port from cluster incorrect: got %s:%d", retInfo.Host, retInfo.Port)
defer nc.Close()
timeout := time.Now().Add(time.Second)
good := false
for time.Now().Before(timeout) {
ds := nc.DiscoveredServers()
if len(ds) == 1 {
if ds[0] == "nats://me:1" {
good = true
break
}
}
time.Sleep(15 * time.Millisecond)
}
if retInfo.IP != "nats-route://example.com:80/" {
t.Fatalf("IP incorrected expected: nats-route://example.com:80/, got: %s", retInfo.IP)
if !good {
t.Fatalf("Did not get expected discovered servers: %v", nc.DiscoveredServers())
}
}

View File

@@ -86,6 +86,12 @@ type Server struct {
debug int32
}
// These store the real client/cluster listen ports. They are
// required during config reload to reset the Options (after
// reload) to the actual listen port values.
clientActualPort int
clusterActualPort int
// Used by tests to check that http.Servers do
// not set any timeout.
monitoringServer *http.Server
@@ -109,23 +115,12 @@ func New(opts *Options) *Server {
tlsReq := opts.TLSConfig != nil
verify := (tlsReq && opts.TLSConfig.ClientAuth == tls.RequireAndVerifyClientCert)
// configure host/port if advertise is set
host := opts.Host
port := opts.Port
if opts.ClientAdvertise != "" {
h, p, err := parseHostPort(opts.ClientAdvertise, opts.Port)
if err == nil {
host = h
port = p
}
}
info := Info{
ID: genID(),
Version: VERSION,
GoVersion: runtime.Version(),
Host: host,
Port: port,
Host: opts.Host,
Port: opts.Port,
AuthRequired: false,
TLSRequired: tlsReq,
SSLRequired: tlsReq,
@@ -148,6 +143,12 @@ func New(opts *Options) *Server {
s.mu.Lock()
defer s.mu.Unlock()
// This is normally done in the AcceptLoop, once the
// listener has been created (possibly with random port),
// but since some tests may expect the INFO to be properly
// set after New(), let's do it now.
s.setInfoHostPortAndGenerateJSON()
// For tracking clients
s.clients = make(map[uint64]*client)
@@ -166,7 +167,6 @@ func New(opts *Options) *Server {
// Used to setup Authorization.
s.configureAuthorization()
s.generateServerInfoJSON()
s.handleSignals()
return s
@@ -421,19 +421,19 @@ func (s *Server) AcceptLoop(clr chan struct{}) {
// to 0 at the beginning this function. So we need to get the actual port
if opts.Port == 0 {
// Write resolved port back to options.
_, port, err := net.SplitHostPort(l.Addr().String())
if err != nil {
s.Fatalf("Error parsing server address (%s): %s", l.Addr().String(), e)
s.mu.Unlock()
return
}
portNum, err := strconv.Atoi(port)
if err != nil {
s.Fatalf("Error parsing server address (%s): %s", l.Addr().String(), e)
s.mu.Unlock()
return
}
opts.Port = portNum
opts.Port = l.Addr().(*net.TCPAddr).Port
}
// Keep track of actual listen port. This will be needed in case of
// config reload.
s.clientActualPort = opts.Port
// Now that port has been set (if it was set to RANDOM), set the
// server's info Host/Port with either values from Options or
// ClientAdvertise. Also generate the JSON byte array.
if err := s.setInfoHostPortAndGenerateJSON(); err != nil {
s.Fatalf("Error setting server INFO with ClientAdvertise value of %s, err=%v", s.opts.ClientAdvertise, err)
s.mu.Unlock()
return
}
s.mu.Unlock()
@@ -469,6 +469,31 @@ func (s *Server) AcceptLoop(clr chan struct{}) {
s.done <- true
}
// This function sets the server's info Host/Port based on server Options.
// Note that this function may be called during config reload, this is why
// Host/Port may be reset to original Options if the ClientAdvertise option
// is not set (since it may have previously been).
// The function then generates the server infoJSON.
func (s *Server) setInfoHostPortAndGenerateJSON() error {
// When this function is called, opts.Port is set to the actual listen
// port (if option was originally set to RANDOM), even during a config
// reload. So use of s.opts.Port is safe.
if s.opts.ClientAdvertise != "" {
h, p, err := parseHostPort(s.opts.ClientAdvertise, s.opts.Port)
if err != nil {
return err
}
s.info.Host = h
s.info.Port = p
} else {
s.info.Host = s.opts.Host
s.info.Port = s.opts.Port
}
// (re)generate the infoJSON byte array.
s.generateServerInfoJSON()
return nil
}
// StartProfiler is called to enable dynamic profiling.
func (s *Server) StartProfiler() {
// Snapshot server options.

View File

@@ -249,7 +249,7 @@ func TestClientAdvertiseConnectURL(t *testing.T) {
s.Shutdown()
opts = DefaultOptions()
opts.Cluster.Port = 0
opts.Port = 0
opts.ClientAdvertise = "nats.example.com:7777"
s = New(opts)
if s.info.Host != "nats.example.com" && s.info.Port != 7777 {
@@ -257,7 +257,25 @@ func TestClientAdvertiseConnectURL(t *testing.T) {
s.info.Host, s.info.Port)
}
s.Shutdown()
}
func TestClientAdvertiseErrorOnStartup(t *testing.T) {
opts := DefaultOptions()
// Set invalid address
opts.ClientAdvertise = "addr:::123"
s := New(opts)
defer s.Shutdown()
dl := &DummyLogger{}
s.SetLogger(dl, false, false)
// Expect this to return due to failure
s.Start()
dl.Lock()
msg := dl.msg
dl.Unlock()
if !strings.Contains(msg, "ClientAdvertise") {
t.Fatalf("Unexpected error: %v", msg)
}
}
func TestNoDeadlockOnStartFailure(t *testing.T) {

View File

@@ -74,7 +74,8 @@ func secondsToDuration(seconds float64) time.Duration {
return time.Duration(ttl)
}
// Parse a host/port string with an optional default port
// Parse a host/port string with a default port to use
// if none (or 0 or -1) is specified in `hostPort` string.
func parseHostPort(hostPort string, defaultPort int) (host string, port int, err error) {
if hostPort != "" {
host, sPort, err := net.SplitHostPort(hostPort)
@@ -90,6 +91,9 @@ func parseHostPort(hostPort string, defaultPort int) (host string, port int, err
if err != nil {
return "", -1, err
}
if port == 0 || port == -1 {
port = defaultPort
}
return strings.TrimSpace(host), port, nil
}
return "", -1, errors.New("No hostport specified")

View File

@@ -30,6 +30,42 @@ func TestParseSInt64(t *testing.T) {
}
}
func TestParseHostPort(t *testing.T) {
check := func(hostPort string, defaultPort int, expectedHost string, expectedPort int, expectedErr bool) {
h, p, err := parseHostPort(hostPort, defaultPort)
if expectedErr {
if err == nil {
stackFatalf(t, "Expected an error, did not get one")
}
// expected error, so we are done
return
}
if !expectedErr && err != nil {
stackFatalf(t, "Unexpected error: %v", err)
}
if expectedHost != h {
stackFatalf(t, "Expected host %q, got %q", expectedHost, h)
}
if expectedPort != p {
stackFatalf(t, "Expected port %d, got %d", expectedPort, p)
}
}
check("addr:1234", 5678, "addr", 1234, false)
check(" addr:1234 ", 5678, "addr", 1234, false)
check(" addr : 1234 ", 5678, "addr", 1234, false)
check("addr", 5678, "addr", 5678, false)
check(" addr ", 5678, "addr", 5678, false)
check("addr:-1", 5678, "addr", 5678, false)
check(" addr:-1 ", 5678, "addr", 5678, false)
check(" addr : -1 ", 5678, "addr", 5678, false)
check("addr:0", 5678, "addr", 5678, false)
check(" addr:0 ", 5678, "addr", 5678, false)
check(" addr : 0 ", 5678, "addr", 5678, false)
check("addr:addr", 0, "", 0, true)
check("addr:::1234", 0, "", 0, true)
check("", 0, "", 0, true)
}
func BenchmarkParseInt(b *testing.B) {
b.SetBytes(1)
n := "12345678"

View File

@@ -3,6 +3,7 @@
package test
import (
"encoding/json"
"testing"
"time"
@@ -281,3 +282,25 @@ func TestControlLineMaximums(t *testing.T) {
send(pubTooLong)
expect(errRe)
}
func TestServerInfoWithClientAdvertise(t *testing.T) {
opts := DefaultTestOptions
opts.Port = PROTO_TEST_PORT
opts.ClientAdvertise = "me:1"
s := RunServer(&opts)
defer s.Shutdown()
c := createClientConn(t, opts.Host, PROTO_TEST_PORT)
defer c.Close()
buf := expectResult(t, c, infoRe)
js := infoRe.FindAllSubmatch(buf, 1)[0][1]
var sinfo server.Info
err := json.Unmarshal(js, &sinfo)
if err != nil {
t.Fatalf("Could not unmarshal INFO json: %v\n", err)
}
if sinfo.Host != "me" || sinfo.Port != 1 {
t.Fatalf("Expected INFO Host:Port to be me:1, got %s:%d", sinfo.Host, sinfo.Port)
}
}