From 6852298e7be0df346ba978d2484e0090b9f607c4 Mon Sep 17 00:00:00 2001 From: Peter Miron Date: Mon, 27 Nov 2017 15:34:15 -0500 Subject: [PATCH 1/5] draft of fix for issue #447. allows advertising separate host:ports to client. --- server/opts.go | 8 +++++ server/server.go | 84 ++++++++++++++++++++++++++----------------- server/server_test.go | 40 +++++++++++++++++++++ 3 files changed, 100 insertions(+), 32 deletions(-) diff --git a/server/opts.go b/server/opts.go index ea282ae6..baa0a2ff 100644 --- a/server/opts.go +++ b/server/opts.go @@ -30,6 +30,7 @@ type ClusterOpts struct { TLSTimeout float64 `json:"-"` TLSConfig *tls.Config `json:"-"` ListenStr string `json:"-"` + AdvertiseStr string `json:"-"` NoAdvertise bool `json:"-"` ConnectRetries int `json:"-"` } @@ -387,6 +388,8 @@ func parseCluster(cm map[string]interface{}, opts *Options) error { opts.Cluster.TLSConfig.ClientAuth = tls.RequireAndVerifyClientCert opts.Cluster.TLSConfig.RootCAs = opts.Cluster.TLSConfig.ClientCAs opts.Cluster.TLSTimeout = tc.Timeout + case "cluster_advertise": + opts.Cluster.AdvertiseStr = mv.(string) case "no_advertise": opts.Cluster.NoAdvertise = mv.(bool) case "connect_retries": @@ -753,6 +756,9 @@ func MergeOptions(fileOpts, flagOpts *Options) *Options { if flagOpts.Cluster.ListenStr != "" { opts.Cluster.ListenStr = flagOpts.Cluster.ListenStr } + if flagOpts.Cluster.AdvertiseStr != "" { + opts.Cluster.AdvertiseStr = flagOpts.Cluster.AdvertiseStr + } if flagOpts.Cluster.NoAdvertise { opts.Cluster.NoAdvertise = true } @@ -974,6 +980,7 @@ func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp, fs.StringVar(&opts.RoutesStr, "routes", "", "Routes to actively solicit a connection.") fs.StringVar(&opts.Cluster.ListenStr, "cluster", "", "Cluster url from which members can solicit routes.") fs.StringVar(&opts.Cluster.ListenStr, "cluster_listen", "", "Cluster url from which members can solicit routes.") + fs.StringVar(&opts.Cluster.AdvertiseStr, "cluster_advertise", "", "Cluster URL sent on info for use with proxied connections.") fs.BoolVar(&opts.Cluster.NoAdvertise, "no_advertise", false, "Advertise known cluster IPs to clients.") fs.IntVar(&opts.Cluster.ConnectRetries, "connect_retries", 0, "For implicit routes, number of connect retries") fs.BoolVar(&showTLSHelp, "help_tls", false, "TLS help.") @@ -1171,6 +1178,7 @@ func overrideCluster(opts *Options) error { opts.Cluster.Username = "" opts.Cluster.Password = "" } + return nil } diff --git a/server/server.go b/server/server.go index 88b703d4..4ebd4936 100644 --- a/server/server.go +++ b/server/server.go @@ -1002,42 +1002,62 @@ func (s *Server) getClientConnectURLs() []string { sPort := strconv.Itoa(opts.Port) urls := make([]string, 0, 1) - ipAddr, err := net.ResolveIPAddr("ip", opts.Host) - // If the host is "any" (0.0.0.0 or ::), get specific IPs from available - // interfaces. - if err == nil && ipAddr.IP.IsUnspecified() { - var ip net.IP - ifaces, _ := net.Interfaces() - for _, i := range ifaces { - addrs, _ := i.Addrs() - for _, addr := range addrs { - switch v := addr.(type) { - case *net.IPNet: - ip = v.IP - case *net.IPAddr: - ip = v.IP + // short circuit if cluster-advertise is set + if opts.Cluster.AdvertiseStr != "" { + hosts := strings.Split(opts.Cluster.AdvertiseStr, ",") + + for _, i := range hosts { + hostPort := strings.Split(i, ":") + host := strings.TrimSpace(hostPort[0]) + + if len(hostPort) > 1 { + port := strings.TrimSpace(hostPort[1]) + // if a separate advertise port is set, use that. Otherwise, use the main listen port. + if port != "" { + sPort = port } - // Skip non global unicast addresses - if !ip.IsGlobalUnicast() || ip.IsUnspecified() { - ip = nil - continue + } + urls = append(urls, net.JoinHostPort(host, sPort)) + } + } else { + ipAddr, err := net.ResolveIPAddr("ip", opts.Host) + // If the host is "any" (0.0.0.0 or ::), get specific IPs from available + // interfaces. + if err == nil && ipAddr.IP.IsUnspecified() { + var ip net.IP + ifaces, _ := net.Interfaces() + for _, i := range ifaces { + addrs, _ := i.Addrs() + for _, addr := range addrs { + switch v := addr.(type) { + case *net.IPNet: + ip = v.IP + case *net.IPAddr: + ip = v.IP + } + // Skip non global unicast addresses + if !ip.IsGlobalUnicast() || ip.IsUnspecified() { + ip = nil + continue + } + urls = append(urls, net.JoinHostPort(ip.String(), sPort)) } - urls = append(urls, net.JoinHostPort(ip.String(), sPort)) + } + } + if err != nil || len(urls) == 0 { + // We are here if s.opts.Host is not "0.0.0.0" nor "::", or if for some + // reason we could not add any URL in the loop above. + // We had a case where a Windows VM was hosed and would have err == nil + // and not add any address in the array in the loop above, and we + // ended-up returning 0.0.0.0, which is problematic for Windows clients. + // Check for 0.0.0.0 or :: specifically, and ignore if that's the case. + if opts.Host == "0.0.0.0" || opts.Host == "::" { + s.Errorf("Address %q can not be resolved properly", opts.Host) + } else { + urls = append(urls, net.JoinHostPort(opts.Host, sPort)) } } } - if err != nil || len(urls) == 0 { - // We are here if s.opts.Host is not "0.0.0.0" nor "::", or if for some - // reason we could not add any URL in the loop above. - // We had a case where a Windows VM was hosed and would have err == nil - // and not add any address in the array in the loop above, and we - // ended-up returning 0.0.0.0, which is problematic for Windows clients. - // Check for 0.0.0.0 or :: specifically, and ignore if that's the case. - if opts.Host == "0.0.0.0" || opts.Host == "::" { - s.Errorf("Address %q can not be resolved properly", opts.Host) - } else { - urls = append(urls, net.JoinHostPort(opts.Host, sPort)) - } - } + return urls } diff --git a/server/server_test.go b/server/server_test.go index 1ca9d1d1..a0714b75 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -213,6 +213,46 @@ func TestGetConnectURLs(t *testing.T) { } } +func TestClusterAdvertiseConnectURL(t *testing.T) { + opts := DefaultOptions() + opts.Port = 4222 + opts.Cluster.AdvertiseStr = "nats.example.com" + + s := New(opts) + + urls := s.getClientConnectURLs() + + if len(urls) != 1 { + t.Fatalf("Expected to get one url, got none: %v with Cluster.AdvertiseStr %v", + opts.Host, opts.Cluster.AdvertiseStr) + } + + if urls[0] != "nats.example.com:4222" { + t.Fatalf("Expected to get '%s', got: '%v'", "nats.example.com:4222", urls[0]) + } + s.Shutdown() + + opts.Cluster.AdvertiseStr = "nats.example.com, nats2.example.com:7777" + + s = New(opts) + + urls = s.getClientConnectURLs() + + if len(urls) != 2 { + t.Fatalf("Expected to get two urls, got %d: %v", len(urls), opts.Cluster.AdvertiseStr) + } + + if urls[0] != "nats.example.com:4222" { + t.Fatalf("Expected 'nats.example.com:4222', got: '%v'", urls[0]) + } + + if urls[1] != "nats2.example.com:7777" { + t.Fatalf("Expected 'nats2.example.com:7777', got: '%v'", urls[1]) + } + + s.Shutdown() +} + func TestNoDeadlockOnStartFailure(t *testing.T) { opts := DefaultOptions() opts.Host = "x.x.x.x" // bad host From 7d34b890c60d285fedc27824c02a96d2b780dd80 Mon Sep 17 00:00:00 2001 From: Peter Miron Date: Tue, 28 Nov 2017 09:55:35 -0500 Subject: [PATCH 2/5] Takes list of client connect addresses. Uses the first as the host / port sent on info. --- server/opts.go | 34 ++++++++++++++++++---------------- server/server.go | 37 ++++++++++++++++++++++++------------- server/server_test.go | 14 +++++++++----- 3 files changed, 51 insertions(+), 34 deletions(-) diff --git a/server/opts.go b/server/opts.go index baa0a2ff..495f50c2 100644 --- a/server/opts.go +++ b/server/opts.go @@ -22,17 +22,18 @@ import ( // ClusterOpts are options for clusters. type ClusterOpts struct { - Host string `json:"addr"` - Port int `json:"cluster_port"` - Username string `json:"-"` - Password string `json:"-"` - AuthTimeout float64 `json:"auth_timeout"` - TLSTimeout float64 `json:"-"` - TLSConfig *tls.Config `json:"-"` - ListenStr string `json:"-"` - AdvertiseStr string `json:"-"` - NoAdvertise bool `json:"-"` - ConnectRetries int `json:"-"` + Host string `json:"addr"` + Port int `json:"cluster_port"` + Username string `json:"-"` + Password string `json:"-"` + AuthTimeout float64 `json:"auth_timeout"` + TLSTimeout float64 `json:"-"` + TLSConfig *tls.Config `json:"-"` + ListenStr string `json:"-"` + ClientAdvertiseStr string `json:"-"` + ClusterAdvertiseStr string `json:"-"` + NoAdvertise bool `json:"-"` + ConnectRetries int `json:"-"` } // Options block for gnatsd server. @@ -388,8 +389,8 @@ func parseCluster(cm map[string]interface{}, opts *Options) error { opts.Cluster.TLSConfig.ClientAuth = tls.RequireAndVerifyClientCert opts.Cluster.TLSConfig.RootCAs = opts.Cluster.TLSConfig.ClientCAs opts.Cluster.TLSTimeout = tc.Timeout - case "cluster_advertise": - opts.Cluster.AdvertiseStr = mv.(string) + case "cluster_client_advertise": + opts.Cluster.ClientAdvertiseStr = mv.(string) case "no_advertise": opts.Cluster.NoAdvertise = mv.(bool) case "connect_retries": @@ -756,8 +757,8 @@ func MergeOptions(fileOpts, flagOpts *Options) *Options { if flagOpts.Cluster.ListenStr != "" { opts.Cluster.ListenStr = flagOpts.Cluster.ListenStr } - if flagOpts.Cluster.AdvertiseStr != "" { - opts.Cluster.AdvertiseStr = flagOpts.Cluster.AdvertiseStr + if flagOpts.Cluster.ClientAdvertiseStr != "" { + opts.Cluster.ClientAdvertiseStr = flagOpts.Cluster.ClientAdvertiseStr } if flagOpts.Cluster.NoAdvertise { opts.Cluster.NoAdvertise = true @@ -980,7 +981,8 @@ func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp, fs.StringVar(&opts.RoutesStr, "routes", "", "Routes to actively solicit a connection.") fs.StringVar(&opts.Cluster.ListenStr, "cluster", "", "Cluster url from which members can solicit routes.") fs.StringVar(&opts.Cluster.ListenStr, "cluster_listen", "", "Cluster url from which members can solicit routes.") - fs.StringVar(&opts.Cluster.AdvertiseStr, "cluster_advertise", "", "Cluster URL sent on info for use with proxied connections.") + fs.StringVar(&opts.Cluster.ClientAdvertiseStr, "cluster_client_advertise", "", "Client url(s) for discovered servers.") + fs.StringVar(&opts.Cluster.ClusterAdvertiseStr, "cluster_advertise", "", "Cluster url(s) for discovered servers.") fs.BoolVar(&opts.Cluster.NoAdvertise, "no_advertise", false, "Advertise known cluster IPs to clients.") fs.IntVar(&opts.Cluster.ConnectRetries, "connect_retries", 0, "For implicit routes, number of connect retries") fs.BoolVar(&showTLSHelp, "help_tls", false, "TLS help.") diff --git a/server/server.go b/server/server.go index 4ebd4936..6c2556fe 100644 --- a/server/server.go +++ b/server/server.go @@ -1003,21 +1003,32 @@ func (s *Server) getClientConnectURLs() []string { urls := make([]string, 0, 1) // short circuit if cluster-advertise is set - if opts.Cluster.AdvertiseStr != "" { - hosts := strings.Split(opts.Cluster.AdvertiseStr, ",") + if opts.Cluster.ClientAdvertiseStr != "" { + hosts := strings.Split(opts.Cluster.ClientAdvertiseStr, ",") - for _, i := range hosts { - hostPort := strings.Split(i, ":") - host := strings.TrimSpace(hostPort[0]) - - if len(hostPort) > 1 { - port := strings.TrimSpace(hostPort[1]) - // if a separate advertise port is set, use that. Otherwise, use the main listen port. - if port != "" { - sPort = port - } + for n, i := range hosts { + host, port, err := net.SplitHostPort(i) + switch err.(type) { + case *net.AddrError: + // try appending the current port + host, port, err = net.SplitHostPort(i + ":" + sPort) } - urls = append(urls, net.JoinHostPort(host, sPort)) + + if err != nil { + s.Fatalf("Client Advertise Address error: %v, on entry: %s", err, i) + } + + // set the info host to the first address in a list + if n == 0 { + s.info.Host = host + s.info.Port, err = strconv.Atoi(port) + } + + if err != nil { + s.Fatalf("Client Advertise Address error: %v, on entry: %s", err, i) + } + + urls = append(urls, net.JoinHostPort(strings.TrimSpace(host), strings.TrimSpace(port))) } } else { ipAddr, err := net.ResolveIPAddr("ip", opts.Host) diff --git a/server/server_test.go b/server/server_test.go index a0714b75..e9a2d457 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -213,10 +213,10 @@ func TestGetConnectURLs(t *testing.T) { } } -func TestClusterAdvertiseConnectURL(t *testing.T) { +func TestClusterClientAdvertiseConnectURL(t *testing.T) { opts := DefaultOptions() opts.Port = 4222 - opts.Cluster.AdvertiseStr = "nats.example.com" + opts.Cluster.ClientAdvertiseStr = "nats.example.com" s := New(opts) @@ -224,7 +224,7 @@ func TestClusterAdvertiseConnectURL(t *testing.T) { if len(urls) != 1 { t.Fatalf("Expected to get one url, got none: %v with Cluster.AdvertiseStr %v", - opts.Host, opts.Cluster.AdvertiseStr) + opts.Host, opts.Cluster.ClientAdvertiseStr) } if urls[0] != "nats.example.com:4222" { @@ -232,14 +232,14 @@ func TestClusterAdvertiseConnectURL(t *testing.T) { } s.Shutdown() - opts.Cluster.AdvertiseStr = "nats.example.com, nats2.example.com:7777" + opts.Cluster.ClientAdvertiseStr = "nats.example.com, nats2.example.com:7777" s = New(opts) urls = s.getClientConnectURLs() if len(urls) != 2 { - t.Fatalf("Expected to get two urls, got %d: %v", len(urls), opts.Cluster.AdvertiseStr) + t.Fatalf("Expected to get two urls, got %d: %v", len(urls), opts.Cluster.ClientAdvertiseStr) } if urls[0] != "nats.example.com:4222" { @@ -253,6 +253,10 @@ func TestClusterAdvertiseConnectURL(t *testing.T) { s.Shutdown() } +func TestClusterAdvertiseConnectURL(t *testing.T) { + +} + func TestNoDeadlockOnStartFailure(t *testing.T) { opts := DefaultOptions() opts.Host = "x.x.x.x" // bad host From 4829592107315d3833ee4039589ed3bb3799e7cc Mon Sep 17 00:00:00 2001 From: Peter Miron Date: Wed, 29 Nov 2017 11:41:08 -0500 Subject: [PATCH 3/5] removed support for array of Advertise addresses. Added support for Route advertise address. --- server/opts.go | 114 +++++++++++++++++++++--------------------- server/route.go | 41 +++++++++++---- server/routes_test.go | 34 +++++++++++++ server/server.go | 38 ++++---------- server/server_test.go | 40 ++++++--------- server/util.go | 25 +++++++++ 6 files changed, 174 insertions(+), 118 deletions(-) diff --git a/server/opts.go b/server/opts.go index 495f50c2..afb8a7d1 100644 --- a/server/opts.go +++ b/server/opts.go @@ -22,59 +22,59 @@ import ( // ClusterOpts are options for clusters. type ClusterOpts struct { - Host string `json:"addr"` - Port int `json:"cluster_port"` - Username string `json:"-"` - Password string `json:"-"` - AuthTimeout float64 `json:"auth_timeout"` - TLSTimeout float64 `json:"-"` - TLSConfig *tls.Config `json:"-"` - ListenStr string `json:"-"` - ClientAdvertiseStr string `json:"-"` - ClusterAdvertiseStr string `json:"-"` - NoAdvertise bool `json:"-"` - ConnectRetries int `json:"-"` + Host string `json:"addr"` + Port int `json:"cluster_port"` + Username string `json:"-"` + Password string `json:"-"` + AuthTimeout float64 `json:"auth_timeout"` + TLSTimeout float64 `json:"-"` + TLSConfig *tls.Config `json:"-"` + ListenStr string `json:"-"` + RouteAdvertise string `json:"-"` + NoAdvertise bool `json:"-"` + ConnectRetries int `json:"-"` } // Options block for gnatsd server. type Options struct { - ConfigFile string `json:"-"` - Host string `json:"addr"` - Port int `json:"port"` - Trace bool `json:"-"` - Debug bool `json:"-"` - NoLog bool `json:"-"` - NoSigs bool `json:"-"` - Logtime bool `json:"-"` - MaxConn int `json:"max_connections"` - Users []*User `json:"-"` - Username string `json:"-"` - Password string `json:"-"` - Authorization string `json:"-"` - PingInterval time.Duration `json:"ping_interval"` - MaxPingsOut int `json:"ping_max"` - HTTPHost string `json:"http_host"` - HTTPPort int `json:"http_port"` - HTTPSPort int `json:"https_port"` - AuthTimeout float64 `json:"auth_timeout"` - MaxControlLine int `json:"max_control_line"` - MaxPayload int `json:"max_payload"` - Cluster ClusterOpts `json:"cluster"` - ProfPort int `json:"-"` - PidFile string `json:"-"` - LogFile string `json:"-"` - Syslog bool `json:"-"` - RemoteSyslog string `json:"-"` - Routes []*url.URL `json:"-"` - RoutesStr string `json:"-"` - TLSTimeout float64 `json:"tls_timeout"` - TLS bool `json:"-"` - TLSVerify bool `json:"-"` - TLSCert string `json:"-"` - TLSKey string `json:"-"` - TLSCaCert string `json:"-"` - TLSConfig *tls.Config `json:"-"` - WriteDeadline time.Duration `json:"-"` + ConfigFile string `json:"-"` + Host string `json:"addr"` + Port int `json:"port"` + ClientAdvertise string `json:"-"` + Trace bool `json:"-"` + Debug bool `json:"-"` + NoLog bool `json:"-"` + NoSigs bool `json:"-"` + Logtime bool `json:"-"` + MaxConn int `json:"max_connections"` + Users []*User `json:"-"` + Username string `json:"-"` + Password string `json:"-"` + Authorization string `json:"-"` + PingInterval time.Duration `json:"ping_interval"` + MaxPingsOut int `json:"ping_max"` + HTTPHost string `json:"http_host"` + HTTPPort int `json:"http_port"` + HTTPSPort int `json:"https_port"` + AuthTimeout float64 `json:"auth_timeout"` + MaxControlLine int `json:"max_control_line"` + MaxPayload int `json:"max_payload"` + Cluster ClusterOpts `json:"cluster"` + ProfPort int `json:"-"` + PidFile string `json:"-"` + LogFile string `json:"-"` + Syslog bool `json:"-"` + RemoteSyslog string `json:"-"` + Routes []*url.URL `json:"-"` + RoutesStr string `json:"-"` + TLSTimeout float64 `json:"tls_timeout"` + TLS bool `json:"-"` + TLSVerify bool `json:"-"` + TLSCert string `json:"-"` + TLSKey string `json:"-"` + TLSCaCert string `json:"-"` + TLSConfig *tls.Config `json:"-"` + WriteDeadline time.Duration `json:"-"` CustomClientAuthentication Authentication `json:"-"` CustomRouterAuthentication Authentication `json:"-"` @@ -204,6 +204,8 @@ func (o *Options) ProcessConfigFile(configFile string) error { } o.Host = hp.host o.Port = hp.port + case "client_advertise": + o.ClientAdvertise = v.(string) case "port": o.Port = int(v.(int64)) case "host", "net": @@ -389,8 +391,8 @@ func parseCluster(cm map[string]interface{}, opts *Options) error { opts.Cluster.TLSConfig.ClientAuth = tls.RequireAndVerifyClientCert opts.Cluster.TLSConfig.RootCAs = opts.Cluster.TLSConfig.ClientCAs opts.Cluster.TLSTimeout = tc.Timeout - case "cluster_client_advertise": - opts.Cluster.ClientAdvertiseStr = mv.(string) + case "route_advertise": + opts.Cluster.RouteAdvertise = mv.(string) case "no_advertise": opts.Cluster.NoAdvertise = mv.(bool) case "connect_retries": @@ -724,6 +726,9 @@ func MergeOptions(fileOpts, flagOpts *Options) *Options { if flagOpts.Host != "" { opts.Host = flagOpts.Host } + if flagOpts.ClientAdvertise != "" { + opts.ClientAdvertise = flagOpts.ClientAdvertise + } if flagOpts.Username != "" { opts.Username = flagOpts.Username } @@ -757,9 +762,6 @@ func MergeOptions(fileOpts, flagOpts *Options) *Options { if flagOpts.Cluster.ListenStr != "" { opts.Cluster.ListenStr = flagOpts.Cluster.ListenStr } - if flagOpts.Cluster.ClientAdvertiseStr != "" { - opts.Cluster.ClientAdvertiseStr = flagOpts.Cluster.ClientAdvertiseStr - } if flagOpts.Cluster.NoAdvertise { opts.Cluster.NoAdvertise = true } @@ -949,6 +951,7 @@ func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp, fs.StringVar(&opts.Host, "addr", "", "Network host to listen on.") fs.StringVar(&opts.Host, "a", "", "Network host to listen on.") fs.StringVar(&opts.Host, "net", "", "Network host to listen on.") + fs.StringVar(&opts.ClientAdvertise, "client_advertise", "", "Client url for discovered servers.") fs.BoolVar(&opts.Debug, "D", false, "Enable Debug logging.") fs.BoolVar(&opts.Debug, "debug", false, "Enable Debug logging.") fs.BoolVar(&opts.Trace, "V", false, "Enable Trace logging.") @@ -981,8 +984,7 @@ func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp, fs.StringVar(&opts.RoutesStr, "routes", "", "Routes to actively solicit a connection.") fs.StringVar(&opts.Cluster.ListenStr, "cluster", "", "Cluster url from which members can solicit routes.") fs.StringVar(&opts.Cluster.ListenStr, "cluster_listen", "", "Cluster url from which members can solicit routes.") - fs.StringVar(&opts.Cluster.ClientAdvertiseStr, "cluster_client_advertise", "", "Client url(s) for discovered servers.") - fs.StringVar(&opts.Cluster.ClusterAdvertiseStr, "cluster_advertise", "", "Cluster url(s) for discovered servers.") + fs.StringVar(&opts.Cluster.RouteAdvertise, "route_advertise", "", "Cluster url(s) for discovered servers.") fs.BoolVar(&opts.Cluster.NoAdvertise, "no_advertise", false, "Advertise known cluster IPs to clients.") fs.IntVar(&opts.Cluster.ConnectRetries, "connect_retries", 0, "For implicit routes, number of connect retries") fs.BoolVar(&showTLSHelp, "help_tls", false, "TLS help.") diff --git a/server/route.go b/server/route.go index a10f1fcf..86861dfd 100644 --- a/server/route.go +++ b/server/route.go @@ -143,16 +143,20 @@ func (c *client) processRouteInfo(info *Info) { // Send our local subscriptions to this route. s.sendLocalSubsToRoute(c) if sendInfo { - // Need to get the remote IP address. - c.mu.Lock() - switch conn := c.nc.(type) { - case *net.TCPConn, *tls.Conn: - addr := conn.RemoteAddr().(*net.TCPAddr) - info.IP = fmt.Sprintf("nats-route://%s/", net.JoinHostPort(addr.IP.String(), strconv.Itoa(info.Port))) - default: - info.IP = c.route.url.String() + // If IP isn't already set on info + if info.IP == "" { + // Need to get the remote IP address. + c.mu.Lock() + switch conn := c.nc.(type) { + case *net.TCPConn, *tls.Conn: + addr := conn.RemoteAddr().(*net.TCPAddr) + info.IP = fmt.Sprintf("nats-route://%s/", net.JoinHostPort(addr.IP.String(), + strconv.Itoa(info.Port))) + default: + info.IP = c.route.url.String() + } + c.mu.Unlock() } - c.mu.Unlock() // Now let the known servers know about this new route s.forwardNewRouteInfoToKnownServers(info) } @@ -637,11 +641,26 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) { // Check for TLSConfig tlsReq := opts.Cluster.TLSConfig != nil + // Configure Cluster Advertise Address + host := opts.Cluster.Host + port = l.Addr().(*net.TCPAddr).Port + ip := "" + if opts.Cluster.RouteAdvertise != "" { + advHost, advPort, err := parseHostPort(opts.Cluster.RouteAdvertise, strconv.Itoa(port)) + if err != nil { + s.Errorf("setting RouteAdvertise failed %v", err) + } else { + host = advHost + port = advPort + } + ip = fmt.Sprintf("nats-route://%s/", net.JoinHostPort(advHost, strconv.Itoa(advPort))) + } info := Info{ ID: s.info.ID, Version: s.info.Version, - Host: opts.Cluster.Host, - Port: l.Addr().(*net.TCPAddr).Port, + Host: host, + Port: port, + IP: ip, AuthRequired: false, TLSRequired: tlsReq, SSLRequired: tlsReq, diff --git a/server/routes_test.go b/server/routes_test.go index 1b396d35..ad6bb8c8 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -53,6 +53,40 @@ func TestRouteConfig(t *testing.T) { } } +func TestRouteAdvertise(t *testing.T) { + // TODO: Need to work through this test case. may need to add a util proxy server + // to validate functionally. + optsSeed, _ := ProcessConfigFile("./configs/seed.conf") + + optsSeed.NoSigs, optsSeed.NoLog = true, false + optsSeed.Debug = true + + srvSeed := RunServer(optsSeed) + defer srvSeed.Shutdown() + + seedRouteUrl := fmt.Sprintf("nats://%s:%d", optsSeed.Cluster.Host, + srvSeed.ClusterAddr().Port) + optsA := nextServerOpts(optsSeed) + optsA.Routes = RoutesFromStr(seedRouteUrl) + optsA.Cluster.Port = 9999 + optsA.Cluster.RouteAdvertise = "example.com:80" + + srvA := RunServer(optsA) + defer srvA.Shutdown() + + if srvA.routeInfo.Host != "example.com" { + t.Fatalf("Expected srvA Route Advertise to be example.com:80, got: %v:%d", + srvA.routeInfo.Host, srvA.routeInfo.Port) + } + // using example.com, but don't expect anything to try to connect to it. + if srvA.routeInfo.IP != "nats-route://example.com:80/" { + t.Fatalf("Expected srvA.routeInfo.IP to be set, got %v", srvA.routeInfo.IP) + } + if srvSeed.routeInfo.IP != "" { + t.Fatalf("Expected srvSeed.routeInfo.IP to not be set, got %v", srvSeed.routeInfo.IP) + } +} + func TestServerRoutesWithClients(t *testing.T) { optsA, _ := ProcessConfigFile("./configs/srv_a.conf") optsB, _ := ProcessConfigFile("./configs/srv_b.conf") diff --git a/server/server.go b/server/server.go index 6c2556fe..dd26d857 100644 --- a/server/server.go +++ b/server/server.go @@ -191,6 +191,7 @@ func (s *Server) generateRouteInfoJSON() { return } s.routeInfoJSON = []byte(fmt.Sprintf(InfoProto, b)) + s.Errorf("route info: %s", b) } // PrintAndDie is exported for access in other packages. @@ -992,6 +993,7 @@ func (s *Server) startGoRoutine(f func()) { // getClientConnectURLs returns suitable URLs for clients to connect to the listen // port based on the server options' Host and Port. If the Host corresponds to // "any" interfaces, this call returns the list of resolved IP addresses. +// If ClientAdvertise is set, returns the client advertise host and port func (s *Server) getClientConnectURLs() []string { // Snapshot server options. opts := s.getOpts() @@ -1002,34 +1004,16 @@ func (s *Server) getClientConnectURLs() []string { sPort := strconv.Itoa(opts.Port) urls := make([]string, 0, 1) - // short circuit if cluster-advertise is set - if opts.Cluster.ClientAdvertiseStr != "" { - hosts := strings.Split(opts.Cluster.ClientAdvertiseStr, ",") - - for n, i := range hosts { - host, port, err := net.SplitHostPort(i) - switch err.(type) { - case *net.AddrError: - // try appending the current port - host, port, err = net.SplitHostPort(i + ":" + sPort) - } - - if err != nil { - s.Fatalf("Client Advertise Address error: %v, on entry: %s", err, i) - } - - // set the info host to the first address in a list - if n == 0 { - s.info.Host = host - s.info.Port, err = strconv.Atoi(port) - } - - if err != nil { - s.Fatalf("Client Advertise Address error: %v, on entry: %s", err, i) - } - - urls = append(urls, net.JoinHostPort(strings.TrimSpace(host), strings.TrimSpace(port))) + // short circuit if client advertise is set + ca := opts.ClientAdvertise + if ca != "" { + host, port, err := parseHostPort(ca, sPort) + s.info.Host = host // TODO: should not set these here. + s.info.Port = port + if err != nil { + s.Errorf("Client Advertise Address %v, on: %s", err, ca) } + urls = append(urls, net.JoinHostPort(host, strconv.Itoa(port))) } else { ipAddr, err := net.ResolveIPAddr("ip", opts.Host) // If the host is "any" (0.0.0.0 or ::), get specific IPs from available diff --git a/server/server_test.go b/server/server_test.go index e9a2d457..e2af3347 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -213,50 +213,42 @@ func TestGetConnectURLs(t *testing.T) { } } -func TestClusterClientAdvertiseConnectURL(t *testing.T) { +func TestClientAdvertiseConnectURL(t *testing.T) { opts := DefaultOptions() opts.Port = 4222 - opts.Cluster.ClientAdvertiseStr = "nats.example.com" - + opts.ClientAdvertise = "nats.example.com" s := New(opts) + defer s.Shutdown() urls := s.getClientConnectURLs() - if len(urls) != 1 { - t.Fatalf("Expected to get one url, got none: %v with Cluster.AdvertiseStr %v", - opts.Host, opts.Cluster.ClientAdvertiseStr) + t.Fatalf("Expected to get one url, got none: %v with ClientAdvertise %v", + opts.Host, opts.ClientAdvertise) } - if urls[0] != "nats.example.com:4222" { t.Fatalf("Expected to get '%s', got: '%v'", "nats.example.com:4222", urls[0]) } s.Shutdown() - opts.Cluster.ClientAdvertiseStr = "nats.example.com, nats2.example.com:7777" - + opts.ClientAdvertise = "nats.example.com:7777" s = New(opts) - urls = s.getClientConnectURLs() - - if len(urls) != 2 { - t.Fatalf("Expected to get two urls, got %d: %v", len(urls), opts.Cluster.ClientAdvertiseStr) + if len(urls) != 1 { + t.Fatalf("Expected to get one url, got none: %v with ClientAdvertise %v", + opts.Host, opts.ClientAdvertise) } - - if urls[0] != "nats.example.com:4222" { - t.Fatalf("Expected 'nats.example.com:4222', got: '%v'", urls[0]) + if urls[0] != "nats.example.com:7777" { + t.Fatalf("Expected 'nats.example.com:7777', got: '%v'", urls[0]) } - - if urls[1] != "nats2.example.com:7777" { - t.Fatalf("Expected 'nats2.example.com:7777', got: '%v'", urls[1]) + if s.info.Host != "nats.example.com" { + t.Fatalf("Expected host to be set to nats.example.com") + } + if s.info.Port != 7777 { + t.Fatalf("Expected port to be set to 7777") } - s.Shutdown() } -func TestClusterAdvertiseConnectURL(t *testing.T) { - -} - func TestNoDeadlockOnStartFailure(t *testing.T) { opts := DefaultOptions() opts.Host = "x.x.x.x" // bad host diff --git a/server/util.go b/server/util.go index 06730b4d..3f7b3da9 100644 --- a/server/util.go +++ b/server/util.go @@ -3,6 +3,10 @@ package server import ( + "errors" + "net" + "strconv" + "strings" "time" "github.com/nats-io/nuid" @@ -68,3 +72,24 @@ func secondsToDuration(seconds float64) time.Duration { ttl := seconds * float64(time.Second) return time.Duration(ttl) } + +// Parse a host/port string with an optional default port +func parseHostPort(hostPort string, defaultPort string) (host string, port int, err error) { + if hostPort != "" { + host, sPort, err := net.SplitHostPort(hostPort) + switch err.(type) { + case *net.AddrError: + // try appending the current port + host, sPort, err = net.SplitHostPort(hostPort + ":" + defaultPort) + } + if err != nil { + return "", -1, err + } + port, err = strconv.Atoi(strings.TrimSpace(sPort)) + if err != nil { + return "", -1, err + } + return strings.TrimSpace(host), port, nil + } + return "", -1, errors.New("No hostport specified") +} From 306a3f95073793335fbe5da6c6e4f457e31506c9 Mon Sep 17 00:00:00 2001 From: Peter Miron Date: Wed, 29 Nov 2017 15:35:05 -0500 Subject: [PATCH 4/5] Resolving Ivan's feedback. --- server/route.go | 2 +- server/routes_test.go | 26 +++++++++++++++++++++++--- server/server.go | 30 +++++++++++++++++------------- server/server_test.go | 11 +++++++++++ server/util.go | 5 +++-- 5 files changed, 55 insertions(+), 19 deletions(-) diff --git a/server/route.go b/server/route.go index 86861dfd..dd7ec784 100644 --- a/server/route.go +++ b/server/route.go @@ -646,7 +646,7 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) { port = l.Addr().(*net.TCPAddr).Port ip := "" if opts.Cluster.RouteAdvertise != "" { - advHost, advPort, err := parseHostPort(opts.Cluster.RouteAdvertise, strconv.Itoa(port)) + advHost, advPort, err := parseHostPort(opts.Cluster.RouteAdvertise, port) if err != nil { s.Errorf("setting RouteAdvertise failed %v", err) } else { diff --git a/server/routes_test.go b/server/routes_test.go index ad6bb8c8..9e871489 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -3,6 +3,8 @@ package server import ( + "bufio" + "encoding/json" "fmt" "net" "net/url" @@ -58,9 +60,7 @@ func TestRouteAdvertise(t *testing.T) { // to validate functionally. optsSeed, _ := ProcessConfigFile("./configs/seed.conf") - optsSeed.NoSigs, optsSeed.NoLog = true, false - optsSeed.Debug = true - + optsSeed.NoSigs, optsSeed.NoLog = true, true srvSeed := RunServer(optsSeed) defer srvSeed.Shutdown() @@ -74,6 +74,7 @@ func TestRouteAdvertise(t *testing.T) { srvA := RunServer(optsA) defer srvA.Shutdown() + srvA.mu.Lock() if srvA.routeInfo.Host != "example.com" { t.Fatalf("Expected srvA Route Advertise to be example.com:80, got: %v:%d", srvA.routeInfo.Host, srvA.routeInfo.Port) @@ -82,9 +83,28 @@ func TestRouteAdvertise(t *testing.T) { if srvA.routeInfo.IP != "nats-route://example.com:80/" { t.Fatalf("Expected srvA.routeInfo.IP to be set, got %v", srvA.routeInfo.IP) } + srvA.mu.Unlock() + srvSeed.mu.Lock() if srvSeed.routeInfo.IP != "" { t.Fatalf("Expected srvSeed.routeInfo.IP to not be set, got %v", srvSeed.routeInfo.IP) } + srvSeed.mu.Unlock() + + // create a TCP client, connect to srvA Cluster and verify info. + testCn, _ := net.Dial("tcp", net.JoinHostPort(optsA.Cluster.Host, strconv.Itoa(optsA.Cluster.Port))) + defer testCn.Close() + msg, _ := bufio.NewReader(testCn).ReadString('\n') + var retInfo Info + err := json.Unmarshal([]byte(strings.TrimLeft(msg, "INFO ")), &retInfo) + if err != nil { + t.Fatalf("Unable to read response: %v", err) + } + if retInfo.Host != "example.com" && retInfo.Port != optsA.Cluster.Port { + t.Fatalf("Host and Port from cluster incorrect: got %s:%d", retInfo.Host, retInfo.Port) + } + if retInfo.IP != "nats-route://example.com:80/" { + t.Fatalf("IP incorrected expected: nats-route://example.com:80/, got: %s", retInfo.IP) + } } func TestServerRoutesWithClients(t *testing.T) { diff --git a/server/server.go b/server/server.go index dd26d857..da041e5f 100644 --- a/server/server.go +++ b/server/server.go @@ -109,12 +109,23 @@ func New(opts *Options) *Server { tlsReq := opts.TLSConfig != nil verify := (tlsReq && opts.TLSConfig.ClientAuth == tls.RequireAndVerifyClientCert) + // configure host/port if advertise is set + host := opts.Host + port := opts.Port + if opts.ClientAdvertise != "" { + h, p, err := parseHostPort(opts.ClientAdvertise, opts.Port) + if err == nil { + host = h + port = p + } + } + info := Info{ ID: genID(), Version: VERSION, GoVersion: runtime.Version(), - Host: opts.Host, - Port: opts.Port, + Host: host, + Port: port, AuthRequired: false, TLSRequired: tlsReq, SSLRequired: tlsReq, @@ -191,7 +202,6 @@ func (s *Server) generateRouteInfoJSON() { return } s.routeInfoJSON = []byte(fmt.Sprintf(InfoProto, b)) - s.Errorf("route info: %s", b) } // PrintAndDie is exported for access in other packages. @@ -1001,20 +1011,14 @@ func (s *Server) getClientConnectURLs() []string { s.mu.Lock() defer s.mu.Unlock() - sPort := strconv.Itoa(opts.Port) urls := make([]string, 0, 1) // short circuit if client advertise is set - ca := opts.ClientAdvertise - if ca != "" { - host, port, err := parseHostPort(ca, sPort) - s.info.Host = host // TODO: should not set these here. - s.info.Port = port - if err != nil { - s.Errorf("Client Advertise Address %v, on: %s", err, ca) - } - urls = append(urls, net.JoinHostPort(host, strconv.Itoa(port))) + if opts.ClientAdvertise != "" { + // just use the info host/port. This is updated in s.New() + urls = append(urls, net.JoinHostPort(s.info.Host, strconv.Itoa(s.info.Port))) } else { + sPort := strconv.Itoa(opts.Port) ipAddr, err := net.ResolveIPAddr("ip", opts.Host) // If the host is "any" (0.0.0.0 or ::), get specific IPs from available // interfaces. diff --git a/server/server_test.go b/server/server_test.go index e2af3347..5d051237 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -247,6 +247,17 @@ func TestClientAdvertiseConnectURL(t *testing.T) { t.Fatalf("Expected port to be set to 7777") } s.Shutdown() + + opts = DefaultOptions() + opts.Cluster.Port = 0 + opts.ClientAdvertise = "nats.example.com:7777" + s = New(opts) + if s.info.Host != "nats.example.com" && s.info.Port != 7777 { + t.Fatalf("Expected Client Advertise Host:Port to be nats.example.com:7777, got: %s:%d", + s.info.Host, s.info.Port) + } + s.Shutdown() + } func TestNoDeadlockOnStartFailure(t *testing.T) { diff --git a/server/util.go b/server/util.go index 3f7b3da9..7abb158d 100644 --- a/server/util.go +++ b/server/util.go @@ -4,6 +4,7 @@ package server import ( "errors" + "fmt" "net" "strconv" "strings" @@ -74,13 +75,13 @@ func secondsToDuration(seconds float64) time.Duration { } // Parse a host/port string with an optional default port -func parseHostPort(hostPort string, defaultPort string) (host string, port int, err error) { +func parseHostPort(hostPort string, defaultPort int) (host string, port int, err error) { if hostPort != "" { host, sPort, err := net.SplitHostPort(hostPort) switch err.(type) { case *net.AddrError: // try appending the current port - host, sPort, err = net.SplitHostPort(hostPort + ":" + defaultPort) + host, sPort, err = net.SplitHostPort(fmt.Sprintf("%s:%d", hostPort, defaultPort)) } if err != nil { return "", -1, err From acf4a31e4bf9301c98c9089a18daeee96f81930f Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Mon, 5 Feb 2018 20:15:36 -0700 Subject: [PATCH 5/5] Major updates + support for config reload of client/cluster advertise --- README.md | 19 +++++ main.go | 2 + server/log_test.go | 20 ++++- server/opts.go | 13 ++-- server/reload.go | 45 ++++++++++- server/reload_test.go | 168 +++++++++++++++++++++++++++++++++++++++++- server/route.go | 69 +++++++++++------ server/routes_test.go | 152 +++++++++++++++++++++++++++----------- server/server.go | 79 +++++++++++++------- server/server_test.go | 20 ++++- server/util.go | 6 +- server/util_test.go | 36 +++++++++ test/proto_test.go | 23 ++++++ 13 files changed, 547 insertions(+), 105 deletions(-) diff --git a/README.md b/README.md index c5e19348..4b37a319 100644 --- a/README.md +++ b/README.md @@ -152,6 +152,7 @@ Server Options: -ms,--https_port Use port for https monitoring -c, --config Configuration file -sl,--signal [=] Send signal to gnatsd process (stop, quit, reopen, reload) + --client_advertise Client URL to advertise to other servers Logging Options: -l, --log File to redirect log output @@ -178,6 +179,7 @@ Cluster Options: --routes Routes to solicit and connect --cluster Cluster URL for solicited routes --no_advertise Advertise known cluster IPs to clients + --cluster_advertise Cluster URL to advertise to other servers --connect_retries For implicit routes, number of connect retries @@ -286,6 +288,23 @@ The `--routes` flag specifies the NATS URL for one or more servers in the cluste Previous releases required you to build the complete mesh using the `--routes` flag. To define your cluster in the current release, please follow the "Basic example" as described below. +Suppose that server srvA is connected to server srvB. A bi-directional route exists between srvA and srvB. A new server, srvC, connects to srvA.
+When accepting the connection, srvA will gossip the address of srvC to srvB so that srvB connects to srvC, completing the full mesh.
+The URL that srvB will use to connect to srvC is the result of the TCP remote address that srvA got from its connection to srvC. + +It is possible to advertise with `--cluster_advertise` a different address than the one used in `--cluster`. + +In the previous example, if srvC uses a `--cluster_adertise` URL, this is what srvA will gossip to srvB in order to connect to srvC. + +NOTE: The advertise address should really result in a connection to srvC. Providing an address that would result in a connection to a different NATS Server would prevent the formation of a full-mesh cluster! + +As part of the gossip protocol, a server will also send to the other servers the URL clients should connect to.
+The URL is the one defined in the `listen` parameter, or, if 0.0.0.0 or :: is specified, the resolved non-local IP addresses for the "any" interface. + +If those addresses are not reacheable from the outside world where the clients are running, the administrator can use the `--no_advertise` option to disable servers gossiping those URLs.
+Another option is to provide a `--client_advertise` URL to use instead. If this option is specified (and advertise has not been disabled), then the server will advertise this URL to other servers instead of its `listen` address (or resolved IPs when listen is 0.0.0.0 or ::). + + ### Basic example NATS makes building the full mesh easy. Simply designate a server to be a *seed* server. All other servers in the cluster simply specify the *seed* server as its server's routes option as indicated below. diff --git a/main.go b/main.go index 810580c6..c33a0bed 100644 --- a/main.go +++ b/main.go @@ -21,6 +21,7 @@ Server Options: -ms,--https_port Use port for https monitoring -c, --config Configuration file -sl,--signal [=] Send signal to gnatsd process (stop, quit, reopen, reload) + --client_advertise Client URL to advertise to other servers Logging Options: -l, --log File to redirect log output @@ -47,6 +48,7 @@ Cluster Options: --routes Routes to solicit and connect --cluster Cluster URL for solicited routes --no_advertise Advertise known cluster IPs to clients + --cluster_advertise Cluster URL to advertise to other servers --connect_retries For implicit routes, number of connect retries diff --git a/server/log_test.go b/server/log_test.go index f93f2477..113495ef 100644 --- a/server/log_test.go +++ b/server/log_test.go @@ -8,6 +8,7 @@ import ( "os" "runtime" "strings" + "sync" "testing" "github.com/nats-io/gnatsd/logger" @@ -64,28 +65,41 @@ func TestSetLogger(t *testing.T) { } type DummyLogger struct { + sync.Mutex msg string } -func (dl *DummyLogger) checkContent(t *testing.T, expectedStr string) { - if dl.msg != expectedStr { - stackFatalf(t, "Expected log to be: %v, got %v", expectedStr, dl.msg) +func (l *DummyLogger) checkContent(t *testing.T, expectedStr string) { + l.Lock() + defer l.Unlock() + if l.msg != expectedStr { + stackFatalf(t, "Expected log to be: %v, got %v", expectedStr, l.msg) } } func (l *DummyLogger) Noticef(format string, v ...interface{}) { + l.Lock() + defer l.Unlock() l.msg = fmt.Sprintf(format, v...) } func (l *DummyLogger) Errorf(format string, v ...interface{}) { + l.Lock() + defer l.Unlock() l.msg = fmt.Sprintf(format, v...) } func (l *DummyLogger) Fatalf(format string, v ...interface{}) { + l.Lock() + defer l.Unlock() l.msg = fmt.Sprintf(format, v...) } func (l *DummyLogger) Debugf(format string, v ...interface{}) { + l.Lock() + defer l.Unlock() l.msg = fmt.Sprintf(format, v...) } func (l *DummyLogger) Tracef(format string, v ...interface{}) { + l.Lock() + defer l.Unlock() l.msg = fmt.Sprintf(format, v...) } diff --git a/server/opts.go b/server/opts.go index afb8a7d1..75484857 100644 --- a/server/opts.go +++ b/server/opts.go @@ -30,7 +30,7 @@ type ClusterOpts struct { TLSTimeout float64 `json:"-"` TLSConfig *tls.Config `json:"-"` ListenStr string `json:"-"` - RouteAdvertise string `json:"-"` + Advertise string `json:"-"` NoAdvertise bool `json:"-"` ConnectRetries int `json:"-"` } @@ -391,8 +391,8 @@ func parseCluster(cm map[string]interface{}, opts *Options) error { opts.Cluster.TLSConfig.ClientAuth = tls.RequireAndVerifyClientCert opts.Cluster.TLSConfig.RootCAs = opts.Cluster.TLSConfig.ClientCAs opts.Cluster.TLSTimeout = tc.Timeout - case "route_advertise": - opts.Cluster.RouteAdvertise = mv.(string) + case "cluster_advertise", "advertise": + opts.Cluster.Advertise = mv.(string) case "no_advertise": opts.Cluster.NoAdvertise = mv.(bool) case "connect_retries": @@ -768,6 +768,9 @@ func MergeOptions(fileOpts, flagOpts *Options) *Options { if flagOpts.Cluster.ConnectRetries != 0 { opts.Cluster.ConnectRetries = flagOpts.Cluster.ConnectRetries } + if flagOpts.Cluster.Advertise != "" { + opts.Cluster.Advertise = flagOpts.Cluster.Advertise + } if flagOpts.RoutesStr != "" { mergeRoutes(&opts, flagOpts) } @@ -951,7 +954,7 @@ func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp, fs.StringVar(&opts.Host, "addr", "", "Network host to listen on.") fs.StringVar(&opts.Host, "a", "", "Network host to listen on.") fs.StringVar(&opts.Host, "net", "", "Network host to listen on.") - fs.StringVar(&opts.ClientAdvertise, "client_advertise", "", "Client url for discovered servers.") + fs.StringVar(&opts.ClientAdvertise, "client_advertise", "", "Client URL to advertise to other servers.") fs.BoolVar(&opts.Debug, "D", false, "Enable Debug logging.") fs.BoolVar(&opts.Debug, "debug", false, "Enable Debug logging.") fs.BoolVar(&opts.Trace, "V", false, "Enable Trace logging.") @@ -984,7 +987,7 @@ func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp, fs.StringVar(&opts.RoutesStr, "routes", "", "Routes to actively solicit a connection.") fs.StringVar(&opts.Cluster.ListenStr, "cluster", "", "Cluster url from which members can solicit routes.") fs.StringVar(&opts.Cluster.ListenStr, "cluster_listen", "", "Cluster url from which members can solicit routes.") - fs.StringVar(&opts.Cluster.RouteAdvertise, "route_advertise", "", "Cluster url(s) for discovered servers.") + fs.StringVar(&opts.Cluster.Advertise, "cluster_advertise", "", "Cluster URL to advertise to other servers.") fs.BoolVar(&opts.Cluster.NoAdvertise, "no_advertise", false, "Advertise known cluster IPs to clients.") fs.IntVar(&opts.Cluster.ConnectRetries, "connect_retries", 0, "For implicit routes, number of connect retries") fs.BoolVar(&showTLSHelp, "help_tls", false, "TLS help.") diff --git a/server/reload.go b/server/reload.go index 0593cdbf..8d2ca2e8 100644 --- a/server/reload.go +++ b/server/reload.go @@ -237,7 +237,7 @@ func (c *clusterOption) Apply(server *Server) { server.routeInfo.SSLRequired = tlsRequired server.routeInfo.TLSVerify = tlsRequired server.routeInfo.AuthRequired = c.newValue.Username != "" - server.generateRouteInfoJSON() + server.setRouteInfoHostPortAndIP() server.mu.Unlock() server.Noticef("Reloaded: cluster") } @@ -407,6 +407,20 @@ func (w *writeDeadlineOption) Apply(server *Server) { server.Noticef("Reloaded: write_deadline = %s", w.newValue) } +// clientAdvertiseOption implements the option interface for the `client_advertise` setting. +type clientAdvertiseOption struct { + noopOption + newValue string +} + +// Apply the setting by updating the server info and regenerate the infoJSON byte array. +func (c *clientAdvertiseOption) Apply(server *Server) { + server.mu.Lock() + server.setInfoHostPortAndGenerateJSON() + server.mu.Unlock() + server.Noticef("Reload: client_advertise = %s", c.newValue) +} + // Reload reads the current configuration file and applies any supported // changes. This returns an error if the server was not started with a config // file or an option which doesn't support hot-swapping was changed. @@ -422,11 +436,25 @@ func (s *Server) Reload() error { // TODO: Dump previous good config to a .bak file? return err } + clientOrgPort := s.clientActualPort + clusterOrgPort := s.clusterActualPort s.mu.Unlock() // Apply flags over config file settings. newOpts = MergeOptions(newOpts, FlagSnapshot) processOptions(newOpts) + + // processOptions sets Port to 0 if set to -1 (RANDOM port) + // If that's the case, set it to the saved value when the accept loop was + // created. + if newOpts.Port == 0 { + newOpts.Port = clientOrgPort + } + // We don't do that for cluster, so check against -1. + if newOpts.Cluster.Port == -1 { + newOpts.Cluster.Port = clusterOrgPort + } + if err := s.reloadOptions(newOpts); err != nil { return err } @@ -518,6 +546,15 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) { diffOpts = append(diffOpts, &maxPingsOutOption{newValue: newValue.(int)}) case "writedeadline": diffOpts = append(diffOpts, &writeDeadlineOption{newValue: newValue.(time.Duration)}) + case "clientadvertise": + cliAdv := newValue.(string) + if cliAdv != "" { + // Validate ClientAdvertise syntax + if _, _, err := parseHostPort(cliAdv, 0); err != nil { + return nil, fmt.Errorf("invalid ClientAdvertise value of %s, err=%v", cliAdv, err) + } + } + diffOpts = append(diffOpts, &clientAdvertiseOption{newValue: cliAdv}) case "nolog": // Ignore NoLog option since it's not parsed and only used in // testing. @@ -612,6 +649,12 @@ func validateClusterOpts(old, new ClusterOpts) error { return fmt.Errorf("Config reload not supported for cluster port: old=%d, new=%d", old.Port, new.Port) } + // Validate Cluster.Advertise syntax + if new.Advertise != "" { + if _, _, err := parseHostPort(new.Advertise, 0); err != nil { + return fmt.Errorf("invalid Cluster.Advertise value of %s, err=%v", new.Advertise, err) + } + } return nil } diff --git a/server/reload_test.go b/server/reload_test.go index 18ee6d66..5861c3fd 100644 --- a/server/reload_test.go +++ b/server/reload_test.go @@ -3,6 +3,7 @@ package server import ( + "encoding/json" "fmt" "io/ioutil" "net" @@ -187,7 +188,7 @@ func TestConfigReload(t *testing.T) { if err := ioutil.WriteFile(platformConf, content, 0666); err != nil { t.Fatalf("Unable to write config file: %v", err) } - server, opts, config := newServerWithSymlinkConfig(t, "tmp.conf", "./configs/reload/test.conf") + server, opts, config := runServerWithSymlinkConfig(t, "tmp.conf", "./configs/reload/test.conf") defer os.Remove(config) defer server.Shutdown() @@ -200,6 +201,7 @@ func TestConfigReload(t *testing.T) { AuthTimeout: 1.0, Debug: false, Trace: false, + NoLog: true, Logtime: false, MaxControlLine: 1024, MaxPayload: 1048576, @@ -209,12 +211,12 @@ func TestConfigReload(t *testing.T) { WriteDeadline: 2 * time.Second, Cluster: ClusterOpts{ Host: "localhost", - Port: -1, + Port: server.ClusterAddr().Port, }, } processOptions(golden) - if !reflect.DeepEqual(golden, server.getOpts()) { + if !reflect.DeepEqual(golden, opts) { t.Fatalf("Options are incorrect.\nexpected: %+v\ngot: %+v", golden, opts) } @@ -1372,6 +1374,166 @@ func TestConfigReloadClusterRoutes(t *testing.T) { } } +func TestConfigReloadClusterAdvertise(t *testing.T) { + conf := "routeadv.conf" + if err := ioutil.WriteFile(conf, []byte(` + listen: "0.0.0.0:-1" + cluster: { + listen: "0.0.0.0:-1" + } + `), 0666); err != nil { + t.Fatalf("Error creating config file: %v", err) + } + defer os.Remove(conf) + opts, err := ProcessConfigFile(conf) + if err != nil { + t.Fatalf("Error processing config file: %v", err) + } + opts.NoLog = true + s := RunServer(opts) + defer s.Shutdown() + + orgClusterPort := s.ClusterAddr().Port + + updateConfig := func(content string) { + if err := ioutil.WriteFile(conf, []byte(content), 0666); err != nil { + stackFatalf(t, "Error creating config file: %v", err) + } + if err := s.Reload(); err != nil { + stackFatalf(t, "Error on reload: %v", err) + } + } + + verify := func(expectedHost string, expectedPort int, expectedIP string) { + s.mu.Lock() + routeInfo := s.routeInfo + routeInfoJSON := Info{} + err = json.Unmarshal(s.routeInfoJSON[5:], &routeInfoJSON) // Skip "INFO " + s.mu.Unlock() + if err != nil { + t.Fatalf("Error on Unmarshal: %v", err) + } + if routeInfo.Host != expectedHost || routeInfo.Port != expectedPort || routeInfo.IP != expectedIP { + t.Fatalf("Expected host/port/IP to be %s:%v, %q, got %s:%d, %q", + expectedHost, expectedPort, expectedIP, routeInfo.Host, routeInfo.Port, routeInfo.IP) + } + // Check that server routeInfoJSON was updated too + if !reflect.DeepEqual(routeInfo, routeInfoJSON) { + t.Fatalf("Expected routeInfoJSON to be %+v, got %+v", routeInfo, routeInfoJSON) + } + } + + // Update config with cluster_advertise + updateConfig(` + listen: "0.0.0.0:-1" + cluster: { + listen: "0.0.0.0:-1" + cluster_advertise: "me:1" + } + `) + verify("me", 1, "nats-route://me:1/") + + // Update config with cluster_advertise (no port specified) + updateConfig(` + listen: "0.0.0.0:-1" + cluster: { + listen: "0.0.0.0:-1" + cluster_advertise: "me" + } + `) + verify("me", orgClusterPort, fmt.Sprintf("nats-route://me:%d/", orgClusterPort)) + + // Update config with cluster_advertise (-1 port specified) + updateConfig(` + listen: "0.0.0.0:-1" + cluster: { + listen: "0.0.0.0:-1" + cluster_advertise: "me:-1" + } + `) + verify("me", orgClusterPort, fmt.Sprintf("nats-route://me:%d/", orgClusterPort)) + + // Update to remove cluster_advertise + updateConfig(` + listen: "0.0.0.0:-1" + cluster: { + listen: "0.0.0.0:-1" + } + `) + verify("0.0.0.0", orgClusterPort, "") +} + +func TestConfigReloadClientAdvertise(t *testing.T) { + conf := "clientadv.conf" + if err := ioutil.WriteFile(conf, []byte(`listen: "0.0.0.0:-1"`), 0666); err != nil { + t.Fatalf("Error creating config file: %v", err) + } + defer os.Remove(conf) + opts, err := ProcessConfigFile(conf) + if err != nil { + stackFatalf(t, "Error processing config file: %v", err) + } + opts.NoLog = true + s := RunServer(opts) + defer s.Shutdown() + + orgPort := s.Addr().(*net.TCPAddr).Port + + updateConfig := func(content string) { + if err := ioutil.WriteFile(conf, []byte(content), 0666); err != nil { + stackFatalf(t, "Error creating config file: %v", err) + } + if err := s.Reload(); err != nil { + stackFatalf(t, "Error on reload: %v", err) + } + } + + verify := func(expectedHost string, expectedPort int) { + s.mu.Lock() + info := s.info + infoJSON := Info{clientConnectURLs: make(map[string]struct{})} + err := json.Unmarshal(s.infoJSON[5:len(s.infoJSON)-2], &infoJSON) // Skip INFO + s.mu.Unlock() + if err != nil { + stackFatalf(t, "Error on Unmarshal: %v", err) + } + if info.Host != expectedHost || info.Port != expectedPort { + stackFatalf(t, "Expected host/port to be %s:%d, got %s:%d", + expectedHost, expectedPort, info.Host, info.Port) + } + // Check that server infoJSON was updated too + if !reflect.DeepEqual(info, infoJSON) { + stackFatalf(t, "Expected infoJSON to be %+v, got %+v", info, infoJSON) + } + } + + // Update config with ClientAdvertise (port specified) + updateConfig(` + listen: "0.0.0.0:-1" + client_advertise: "me:1" + `) + verify("me", 1) + + // Update config with ClientAdvertise (no port specified) + updateConfig(` + listen: "0.0.0.0:-1" + client_advertise: "me" + `) + verify("me", orgPort) + + // Update config with ClientAdvertise (-1 port specified) + updateConfig(` + listen: "0.0.0.0:-1" + client_advertise: "me:-1" + `) + verify("me", orgPort) + + // Now remove ClientAdvertise to check that original values + // are restored. + updateConfig(`listen: "0.0.0.0:-1"`) + verify("0.0.0.0", orgPort) +} + // Ensure Reload supports changing the max connections. Test this by starting a // server with no max connections, connecting two clients, reloading with a // max connections of one, and ensuring one client is disconnected. diff --git a/server/route.go b/server/route.go index dd7ec784..c75a7e5a 100644 --- a/server/route.go +++ b/server/route.go @@ -143,7 +143,9 @@ func (c *client) processRouteInfo(info *Info) { // Send our local subscriptions to this route. s.sendLocalSubsToRoute(c) if sendInfo { - // If IP isn't already set on info + // The incoming INFO from the route will have IP set + // if it has Cluster.Advertise. In that case, use that + // otherwise contruct it from the remote TCP address. if info.IP == "" { // Need to get the remote IP address. c.mu.Lock() @@ -614,6 +616,12 @@ func (s *Server) broadcastUnSubscribe(sub *subscription) { } func (s *Server) routeAcceptLoop(ch chan struct{}) { + defer func() { + if ch != nil { + close(ch) + } + }() + // Snapshot server options. opts := s.getOpts() @@ -633,34 +641,16 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) { s.Noticef("Listening for route connections on %s", hp) l, e := net.Listen("tcp", hp) if e != nil { - // We need to close this channel to avoid a deadlock - close(ch) s.Fatalf("Error listening on router port: %d - %v", opts.Cluster.Port, e) return } + s.mu.Lock() // Check for TLSConfig tlsReq := opts.Cluster.TLSConfig != nil - // Configure Cluster Advertise Address - host := opts.Cluster.Host - port = l.Addr().(*net.TCPAddr).Port - ip := "" - if opts.Cluster.RouteAdvertise != "" { - advHost, advPort, err := parseHostPort(opts.Cluster.RouteAdvertise, port) - if err != nil { - s.Errorf("setting RouteAdvertise failed %v", err) - } else { - host = advHost - port = advPort - } - ip = fmt.Sprintf("nats-route://%s/", net.JoinHostPort(advHost, strconv.Itoa(advPort))) - } info := Info{ ID: s.info.ID, Version: s.info.Version, - Host: host, - Port: port, - IP: ip, AuthRequired: false, TLSRequired: tlsReq, SSLRequired: tlsReq, @@ -668,20 +658,33 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) { MaxPayload: s.info.MaxPayload, ClientConnectURLs: clientConnectURLs, } + // If we have selected a random port... + if port == 0 { + // Write resolved port back to options. + opts.Cluster.Port = l.Addr().(*net.TCPAddr).Port + } + // Keep track of actual listen port. This will be needed in case of + // config reload. + s.clusterActualPort = opts.Cluster.Port // Check for Auth items if opts.Cluster.Username != "" { info.AuthRequired = true } s.routeInfo = info - s.generateRouteInfoJSON() - + // Possibly override Host/Port and set IP based on Cluster.Advertise + if err := s.setRouteInfoHostPortAndIP(); err != nil { + s.Fatalf("Error setting route INFO with Cluster.Advertise value of %s, err=%v", s.opts.Cluster.Advertise, err) + l.Close() + s.mu.Unlock() + return + } // Setup state that can enable shutdown - s.mu.Lock() s.routeListener = l s.mu.Unlock() // Let them know we are up close(ch) + ch = nil tmpDelay := ACCEPT_MIN_SLEEP @@ -711,6 +714,26 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) { s.done <- true } +// Similar to setInfoHostPortAndGenerateJSON, but for routeInfo. +func (s *Server) setRouteInfoHostPortAndIP() error { + if s.opts.Cluster.Advertise != "" { + advHost, advPort, err := parseHostPort(s.opts.Cluster.Advertise, s.opts.Cluster.Port) + if err != nil { + return err + } + s.routeInfo.Host = advHost + s.routeInfo.Port = advPort + s.routeInfo.IP = fmt.Sprintf("nats-route://%s/", net.JoinHostPort(advHost, strconv.Itoa(advPort))) + } else { + s.routeInfo.Host = s.opts.Cluster.Host + s.routeInfo.Port = s.opts.Cluster.Port + s.routeInfo.IP = "" + } + // (re)generate the routeInfoJSON byte array + s.generateRouteInfoJSON() + return nil +} + // StartRouting will start the accept loop on the cluster host:port // and will actively try to connect to listed routes. func (s *Server) StartRouting(clientListenReady chan struct{}) { diff --git a/server/routes_test.go b/server/routes_test.go index 9e871489..6d7a367a 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -3,8 +3,6 @@ package server import ( - "bufio" - "encoding/json" "fmt" "net" "net/url" @@ -55,55 +53,127 @@ func TestRouteConfig(t *testing.T) { } } -func TestRouteAdvertise(t *testing.T) { - // TODO: Need to work through this test case. may need to add a util proxy server - // to validate functionally. - optsSeed, _ := ProcessConfigFile("./configs/seed.conf") +func TestClusterAdvertise(t *testing.T) { + lst, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("Error starting listener: %v", err) + } + ch := make(chan error) + go func() { + c, err := lst.Accept() + if err != nil { + ch <- err + return + } + c.Close() + ch <- nil + }() - optsSeed.NoSigs, optsSeed.NoLog = true, true - srvSeed := RunServer(optsSeed) - defer srvSeed.Shutdown() + optsA, _ := ProcessConfigFile("./configs/seed.conf") + optsA.NoSigs, optsA.NoLog = true, true + srvA := RunServer(optsA) + defer srvA.Shutdown() - seedRouteUrl := fmt.Sprintf("nats://%s:%d", optsSeed.Cluster.Host, - srvSeed.ClusterAddr().Port) - optsA := nextServerOpts(optsSeed) - optsA.Routes = RoutesFromStr(seedRouteUrl) - optsA.Cluster.Port = 9999 - optsA.Cluster.RouteAdvertise = "example.com:80" + srvARouteURL := fmt.Sprintf("nats://%s:%d", optsA.Cluster.Host, srvA.ClusterAddr().Port) + optsB := nextServerOpts(optsA) + optsB.Routes = RoutesFromStr(srvARouteURL) + + srvB := RunServer(optsB) + defer srvB.Shutdown() + + // Wait for these 2 to connect to each other + checkClusterFormed(t, srvA, srvB) + + // Now start server C that connects to A. A should ask B to connect to C, + // based on C's URL. But since C configures a Cluster.Advertise, it will connect + // to our listener. + optsC := nextServerOpts(optsB) + optsC.Cluster.Advertise = lst.Addr().String() + optsC.ClientAdvertise = "me:1" + optsC.Routes = RoutesFromStr(srvARouteURL) + + srvC := RunServer(optsC) + defer srvC.Shutdown() + + select { + case e := <-ch: + if e != nil { + t.Fatalf("Error: %v", e) + } + case <-time.After(2 * time.Second): + t.Fatalf("Test timed out") + } +} + +func TestClusterAdvertiseErrorOnStartup(t *testing.T) { + opts := DefaultOptions() + // Set invalid address + opts.Cluster.Advertise = "addr:::123" + s := New(opts) + defer s.Shutdown() + dl := &DummyLogger{} + s.SetLogger(dl, false, false) + + // Start will keep running, so start in a go-routine. + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + s.Start() + wg.Done() + }() + msg := "" + ok := false + timeout := time.Now().Add(2 * time.Second) + for time.Now().Before(timeout) { + dl.Lock() + msg = dl.msg + dl.Unlock() + if strings.Contains(msg, "Cluster.Advertise") { + ok = true + break + } + } + if !ok { + t.Fatalf("Did not get expected error, got %v", msg) + } + s.Shutdown() + wg.Wait() +} + +func TestClientAdvertise(t *testing.T) { + optsA, _ := ProcessConfigFile("./configs/seed.conf") + optsA.NoSigs, optsA.NoLog = true, true srvA := RunServer(optsA) defer srvA.Shutdown() - srvA.mu.Lock() - if srvA.routeInfo.Host != "example.com" { - t.Fatalf("Expected srvA Route Advertise to be example.com:80, got: %v:%d", - srvA.routeInfo.Host, srvA.routeInfo.Port) - } - // using example.com, but don't expect anything to try to connect to it. - if srvA.routeInfo.IP != "nats-route://example.com:80/" { - t.Fatalf("Expected srvA.routeInfo.IP to be set, got %v", srvA.routeInfo.IP) - } - srvA.mu.Unlock() - srvSeed.mu.Lock() - if srvSeed.routeInfo.IP != "" { - t.Fatalf("Expected srvSeed.routeInfo.IP to not be set, got %v", srvSeed.routeInfo.IP) - } - srvSeed.mu.Unlock() + optsB := nextServerOpts(optsA) + optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsA.Cluster.Host, optsA.Cluster.Port)) + optsB.ClientAdvertise = "me:1" + srvB := RunServer(optsB) + defer srvB.Shutdown() - // create a TCP client, connect to srvA Cluster and verify info. - testCn, _ := net.Dial("tcp", net.JoinHostPort(optsA.Cluster.Host, strconv.Itoa(optsA.Cluster.Port))) - defer testCn.Close() - msg, _ := bufio.NewReader(testCn).ReadString('\n') - var retInfo Info - err := json.Unmarshal([]byte(strings.TrimLeft(msg, "INFO ")), &retInfo) + checkClusterFormed(t, srvA, srvB) + + nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port)) if err != nil { - t.Fatalf("Unable to read response: %v", err) + t.Fatalf("Error on connect: %v", err) } - if retInfo.Host != "example.com" && retInfo.Port != optsA.Cluster.Port { - t.Fatalf("Host and Port from cluster incorrect: got %s:%d", retInfo.Host, retInfo.Port) + defer nc.Close() + timeout := time.Now().Add(time.Second) + good := false + for time.Now().Before(timeout) { + ds := nc.DiscoveredServers() + if len(ds) == 1 { + if ds[0] == "nats://me:1" { + good = true + break + } + } + time.Sleep(15 * time.Millisecond) } - if retInfo.IP != "nats-route://example.com:80/" { - t.Fatalf("IP incorrected expected: nats-route://example.com:80/, got: %s", retInfo.IP) + if !good { + t.Fatalf("Did not get expected discovered servers: %v", nc.DiscoveredServers()) } } diff --git a/server/server.go b/server/server.go index da041e5f..f1dec11a 100644 --- a/server/server.go +++ b/server/server.go @@ -86,6 +86,12 @@ type Server struct { debug int32 } + // These store the real client/cluster listen ports. They are + // required during config reload to reset the Options (after + // reload) to the actual listen port values. + clientActualPort int + clusterActualPort int + // Used by tests to check that http.Servers do // not set any timeout. monitoringServer *http.Server @@ -109,23 +115,12 @@ func New(opts *Options) *Server { tlsReq := opts.TLSConfig != nil verify := (tlsReq && opts.TLSConfig.ClientAuth == tls.RequireAndVerifyClientCert) - // configure host/port if advertise is set - host := opts.Host - port := opts.Port - if opts.ClientAdvertise != "" { - h, p, err := parseHostPort(opts.ClientAdvertise, opts.Port) - if err == nil { - host = h - port = p - } - } - info := Info{ ID: genID(), Version: VERSION, GoVersion: runtime.Version(), - Host: host, - Port: port, + Host: opts.Host, + Port: opts.Port, AuthRequired: false, TLSRequired: tlsReq, SSLRequired: tlsReq, @@ -148,6 +143,12 @@ func New(opts *Options) *Server { s.mu.Lock() defer s.mu.Unlock() + // This is normally done in the AcceptLoop, once the + // listener has been created (possibly with random port), + // but since some tests may expect the INFO to be properly + // set after New(), let's do it now. + s.setInfoHostPortAndGenerateJSON() + // For tracking clients s.clients = make(map[uint64]*client) @@ -166,7 +167,6 @@ func New(opts *Options) *Server { // Used to setup Authorization. s.configureAuthorization() - s.generateServerInfoJSON() s.handleSignals() return s @@ -421,19 +421,19 @@ func (s *Server) AcceptLoop(clr chan struct{}) { // to 0 at the beginning this function. So we need to get the actual port if opts.Port == 0 { // Write resolved port back to options. - _, port, err := net.SplitHostPort(l.Addr().String()) - if err != nil { - s.Fatalf("Error parsing server address (%s): %s", l.Addr().String(), e) - s.mu.Unlock() - return - } - portNum, err := strconv.Atoi(port) - if err != nil { - s.Fatalf("Error parsing server address (%s): %s", l.Addr().String(), e) - s.mu.Unlock() - return - } - opts.Port = portNum + opts.Port = l.Addr().(*net.TCPAddr).Port + } + // Keep track of actual listen port. This will be needed in case of + // config reload. + s.clientActualPort = opts.Port + + // Now that port has been set (if it was set to RANDOM), set the + // server's info Host/Port with either values from Options or + // ClientAdvertise. Also generate the JSON byte array. + if err := s.setInfoHostPortAndGenerateJSON(); err != nil { + s.Fatalf("Error setting server INFO with ClientAdvertise value of %s, err=%v", s.opts.ClientAdvertise, err) + s.mu.Unlock() + return } s.mu.Unlock() @@ -469,6 +469,31 @@ func (s *Server) AcceptLoop(clr chan struct{}) { s.done <- true } +// This function sets the server's info Host/Port based on server Options. +// Note that this function may be called during config reload, this is why +// Host/Port may be reset to original Options if the ClientAdvertise option +// is not set (since it may have previously been). +// The function then generates the server infoJSON. +func (s *Server) setInfoHostPortAndGenerateJSON() error { + // When this function is called, opts.Port is set to the actual listen + // port (if option was originally set to RANDOM), even during a config + // reload. So use of s.opts.Port is safe. + if s.opts.ClientAdvertise != "" { + h, p, err := parseHostPort(s.opts.ClientAdvertise, s.opts.Port) + if err != nil { + return err + } + s.info.Host = h + s.info.Port = p + } else { + s.info.Host = s.opts.Host + s.info.Port = s.opts.Port + } + // (re)generate the infoJSON byte array. + s.generateServerInfoJSON() + return nil +} + // StartProfiler is called to enable dynamic profiling. func (s *Server) StartProfiler() { // Snapshot server options. diff --git a/server/server_test.go b/server/server_test.go index 5d051237..c9e8ece4 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -249,7 +249,7 @@ func TestClientAdvertiseConnectURL(t *testing.T) { s.Shutdown() opts = DefaultOptions() - opts.Cluster.Port = 0 + opts.Port = 0 opts.ClientAdvertise = "nats.example.com:7777" s = New(opts) if s.info.Host != "nats.example.com" && s.info.Port != 7777 { @@ -257,7 +257,25 @@ func TestClientAdvertiseConnectURL(t *testing.T) { s.info.Host, s.info.Port) } s.Shutdown() +} +func TestClientAdvertiseErrorOnStartup(t *testing.T) { + opts := DefaultOptions() + // Set invalid address + opts.ClientAdvertise = "addr:::123" + s := New(opts) + defer s.Shutdown() + dl := &DummyLogger{} + s.SetLogger(dl, false, false) + + // Expect this to return due to failure + s.Start() + dl.Lock() + msg := dl.msg + dl.Unlock() + if !strings.Contains(msg, "ClientAdvertise") { + t.Fatalf("Unexpected error: %v", msg) + } } func TestNoDeadlockOnStartFailure(t *testing.T) { diff --git a/server/util.go b/server/util.go index 7abb158d..05208c73 100644 --- a/server/util.go +++ b/server/util.go @@ -74,7 +74,8 @@ func secondsToDuration(seconds float64) time.Duration { return time.Duration(ttl) } -// Parse a host/port string with an optional default port +// Parse a host/port string with a default port to use +// if none (or 0 or -1) is specified in `hostPort` string. func parseHostPort(hostPort string, defaultPort int) (host string, port int, err error) { if hostPort != "" { host, sPort, err := net.SplitHostPort(hostPort) @@ -90,6 +91,9 @@ func parseHostPort(hostPort string, defaultPort int) (host string, port int, err if err != nil { return "", -1, err } + if port == 0 || port == -1 { + port = defaultPort + } return strings.TrimSpace(host), port, nil } return "", -1, errors.New("No hostport specified") diff --git a/server/util_test.go b/server/util_test.go index 24ba9611..393819c8 100644 --- a/server/util_test.go +++ b/server/util_test.go @@ -30,6 +30,42 @@ func TestParseSInt64(t *testing.T) { } } +func TestParseHostPort(t *testing.T) { + check := func(hostPort string, defaultPort int, expectedHost string, expectedPort int, expectedErr bool) { + h, p, err := parseHostPort(hostPort, defaultPort) + if expectedErr { + if err == nil { + stackFatalf(t, "Expected an error, did not get one") + } + // expected error, so we are done + return + } + if !expectedErr && err != nil { + stackFatalf(t, "Unexpected error: %v", err) + } + if expectedHost != h { + stackFatalf(t, "Expected host %q, got %q", expectedHost, h) + } + if expectedPort != p { + stackFatalf(t, "Expected port %d, got %d", expectedPort, p) + } + } + check("addr:1234", 5678, "addr", 1234, false) + check(" addr:1234 ", 5678, "addr", 1234, false) + check(" addr : 1234 ", 5678, "addr", 1234, false) + check("addr", 5678, "addr", 5678, false) + check(" addr ", 5678, "addr", 5678, false) + check("addr:-1", 5678, "addr", 5678, false) + check(" addr:-1 ", 5678, "addr", 5678, false) + check(" addr : -1 ", 5678, "addr", 5678, false) + check("addr:0", 5678, "addr", 5678, false) + check(" addr:0 ", 5678, "addr", 5678, false) + check(" addr : 0 ", 5678, "addr", 5678, false) + check("addr:addr", 0, "", 0, true) + check("addr:::1234", 0, "", 0, true) + check("", 0, "", 0, true) +} + func BenchmarkParseInt(b *testing.B) { b.SetBytes(1) n := "12345678" diff --git a/test/proto_test.go b/test/proto_test.go index c58c8257..aa5f42fa 100644 --- a/test/proto_test.go +++ b/test/proto_test.go @@ -3,6 +3,7 @@ package test import ( + "encoding/json" "testing" "time" @@ -281,3 +282,25 @@ func TestControlLineMaximums(t *testing.T) { send(pubTooLong) expect(errRe) } + +func TestServerInfoWithClientAdvertise(t *testing.T) { + opts := DefaultTestOptions + opts.Port = PROTO_TEST_PORT + opts.ClientAdvertise = "me:1" + s := RunServer(&opts) + defer s.Shutdown() + + c := createClientConn(t, opts.Host, PROTO_TEST_PORT) + defer c.Close() + + buf := expectResult(t, c, infoRe) + js := infoRe.FindAllSubmatch(buf, 1)[0][1] + var sinfo server.Info + err := json.Unmarshal(js, &sinfo) + if err != nil { + t.Fatalf("Could not unmarshal INFO json: %v\n", err) + } + if sinfo.Host != "me" || sinfo.Port != 1 { + t.Fatalf("Expected INFO Host:Port to be me:1, got %s:%d", sinfo.Host, sinfo.Port) + } +}