removed support for array of Advertise addresses. Added support for Route advertise address.

This commit is contained in:
Peter Miron
2017-11-29 11:41:08 -05:00
parent 7d34b890c6
commit 4829592107
6 changed files with 174 additions and 118 deletions

View File

@@ -22,59 +22,59 @@ import (
// ClusterOpts are options for clusters.
type ClusterOpts struct {
Host string `json:"addr"`
Port int `json:"cluster_port"`
Username string `json:"-"`
Password string `json:"-"`
AuthTimeout float64 `json:"auth_timeout"`
TLSTimeout float64 `json:"-"`
TLSConfig *tls.Config `json:"-"`
ListenStr string `json:"-"`
ClientAdvertiseStr string `json:"-"`
ClusterAdvertiseStr string `json:"-"`
NoAdvertise bool `json:"-"`
ConnectRetries int `json:"-"`
Host string `json:"addr"`
Port int `json:"cluster_port"`
Username string `json:"-"`
Password string `json:"-"`
AuthTimeout float64 `json:"auth_timeout"`
TLSTimeout float64 `json:"-"`
TLSConfig *tls.Config `json:"-"`
ListenStr string `json:"-"`
RouteAdvertise string `json:"-"`
NoAdvertise bool `json:"-"`
ConnectRetries int `json:"-"`
}
// Options block for gnatsd server.
type Options struct {
ConfigFile string `json:"-"`
Host string `json:"addr"`
Port int `json:"port"`
Trace bool `json:"-"`
Debug bool `json:"-"`
NoLog bool `json:"-"`
NoSigs bool `json:"-"`
Logtime bool `json:"-"`
MaxConn int `json:"max_connections"`
Users []*User `json:"-"`
Username string `json:"-"`
Password string `json:"-"`
Authorization string `json:"-"`
PingInterval time.Duration `json:"ping_interval"`
MaxPingsOut int `json:"ping_max"`
HTTPHost string `json:"http_host"`
HTTPPort int `json:"http_port"`
HTTPSPort int `json:"https_port"`
AuthTimeout float64 `json:"auth_timeout"`
MaxControlLine int `json:"max_control_line"`
MaxPayload int `json:"max_payload"`
Cluster ClusterOpts `json:"cluster"`
ProfPort int `json:"-"`
PidFile string `json:"-"`
LogFile string `json:"-"`
Syslog bool `json:"-"`
RemoteSyslog string `json:"-"`
Routes []*url.URL `json:"-"`
RoutesStr string `json:"-"`
TLSTimeout float64 `json:"tls_timeout"`
TLS bool `json:"-"`
TLSVerify bool `json:"-"`
TLSCert string `json:"-"`
TLSKey string `json:"-"`
TLSCaCert string `json:"-"`
TLSConfig *tls.Config `json:"-"`
WriteDeadline time.Duration `json:"-"`
ConfigFile string `json:"-"`
Host string `json:"addr"`
Port int `json:"port"`
ClientAdvertise string `json:"-"`
Trace bool `json:"-"`
Debug bool `json:"-"`
NoLog bool `json:"-"`
NoSigs bool `json:"-"`
Logtime bool `json:"-"`
MaxConn int `json:"max_connections"`
Users []*User `json:"-"`
Username string `json:"-"`
Password string `json:"-"`
Authorization string `json:"-"`
PingInterval time.Duration `json:"ping_interval"`
MaxPingsOut int `json:"ping_max"`
HTTPHost string `json:"http_host"`
HTTPPort int `json:"http_port"`
HTTPSPort int `json:"https_port"`
AuthTimeout float64 `json:"auth_timeout"`
MaxControlLine int `json:"max_control_line"`
MaxPayload int `json:"max_payload"`
Cluster ClusterOpts `json:"cluster"`
ProfPort int `json:"-"`
PidFile string `json:"-"`
LogFile string `json:"-"`
Syslog bool `json:"-"`
RemoteSyslog string `json:"-"`
Routes []*url.URL `json:"-"`
RoutesStr string `json:"-"`
TLSTimeout float64 `json:"tls_timeout"`
TLS bool `json:"-"`
TLSVerify bool `json:"-"`
TLSCert string `json:"-"`
TLSKey string `json:"-"`
TLSCaCert string `json:"-"`
TLSConfig *tls.Config `json:"-"`
WriteDeadline time.Duration `json:"-"`
CustomClientAuthentication Authentication `json:"-"`
CustomRouterAuthentication Authentication `json:"-"`
@@ -204,6 +204,8 @@ func (o *Options) ProcessConfigFile(configFile string) error {
}
o.Host = hp.host
o.Port = hp.port
case "client_advertise":
o.ClientAdvertise = v.(string)
case "port":
o.Port = int(v.(int64))
case "host", "net":
@@ -389,8 +391,8 @@ func parseCluster(cm map[string]interface{}, opts *Options) error {
opts.Cluster.TLSConfig.ClientAuth = tls.RequireAndVerifyClientCert
opts.Cluster.TLSConfig.RootCAs = opts.Cluster.TLSConfig.ClientCAs
opts.Cluster.TLSTimeout = tc.Timeout
case "cluster_client_advertise":
opts.Cluster.ClientAdvertiseStr = mv.(string)
case "route_advertise":
opts.Cluster.RouteAdvertise = mv.(string)
case "no_advertise":
opts.Cluster.NoAdvertise = mv.(bool)
case "connect_retries":
@@ -724,6 +726,9 @@ func MergeOptions(fileOpts, flagOpts *Options) *Options {
if flagOpts.Host != "" {
opts.Host = flagOpts.Host
}
if flagOpts.ClientAdvertise != "" {
opts.ClientAdvertise = flagOpts.ClientAdvertise
}
if flagOpts.Username != "" {
opts.Username = flagOpts.Username
}
@@ -757,9 +762,6 @@ func MergeOptions(fileOpts, flagOpts *Options) *Options {
if flagOpts.Cluster.ListenStr != "" {
opts.Cluster.ListenStr = flagOpts.Cluster.ListenStr
}
if flagOpts.Cluster.ClientAdvertiseStr != "" {
opts.Cluster.ClientAdvertiseStr = flagOpts.Cluster.ClientAdvertiseStr
}
if flagOpts.Cluster.NoAdvertise {
opts.Cluster.NoAdvertise = true
}
@@ -949,6 +951,7 @@ func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp,
fs.StringVar(&opts.Host, "addr", "", "Network host to listen on.")
fs.StringVar(&opts.Host, "a", "", "Network host to listen on.")
fs.StringVar(&opts.Host, "net", "", "Network host to listen on.")
fs.StringVar(&opts.ClientAdvertise, "client_advertise", "", "Client url for discovered servers.")
fs.BoolVar(&opts.Debug, "D", false, "Enable Debug logging.")
fs.BoolVar(&opts.Debug, "debug", false, "Enable Debug logging.")
fs.BoolVar(&opts.Trace, "V", false, "Enable Trace logging.")
@@ -981,8 +984,7 @@ func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp,
fs.StringVar(&opts.RoutesStr, "routes", "", "Routes to actively solicit a connection.")
fs.StringVar(&opts.Cluster.ListenStr, "cluster", "", "Cluster url from which members can solicit routes.")
fs.StringVar(&opts.Cluster.ListenStr, "cluster_listen", "", "Cluster url from which members can solicit routes.")
fs.StringVar(&opts.Cluster.ClientAdvertiseStr, "cluster_client_advertise", "", "Client url(s) for discovered servers.")
fs.StringVar(&opts.Cluster.ClusterAdvertiseStr, "cluster_advertise", "", "Cluster url(s) for discovered servers.")
fs.StringVar(&opts.Cluster.RouteAdvertise, "route_advertise", "", "Cluster url(s) for discovered servers.")
fs.BoolVar(&opts.Cluster.NoAdvertise, "no_advertise", false, "Advertise known cluster IPs to clients.")
fs.IntVar(&opts.Cluster.ConnectRetries, "connect_retries", 0, "For implicit routes, number of connect retries")
fs.BoolVar(&showTLSHelp, "help_tls", false, "TLS help.")

View File

@@ -143,16 +143,20 @@ func (c *client) processRouteInfo(info *Info) {
// Send our local subscriptions to this route.
s.sendLocalSubsToRoute(c)
if sendInfo {
// Need to get the remote IP address.
c.mu.Lock()
switch conn := c.nc.(type) {
case *net.TCPConn, *tls.Conn:
addr := conn.RemoteAddr().(*net.TCPAddr)
info.IP = fmt.Sprintf("nats-route://%s/", net.JoinHostPort(addr.IP.String(), strconv.Itoa(info.Port)))
default:
info.IP = c.route.url.String()
// If IP isn't already set on info
if info.IP == "" {
// Need to get the remote IP address.
c.mu.Lock()
switch conn := c.nc.(type) {
case *net.TCPConn, *tls.Conn:
addr := conn.RemoteAddr().(*net.TCPAddr)
info.IP = fmt.Sprintf("nats-route://%s/", net.JoinHostPort(addr.IP.String(),
strconv.Itoa(info.Port)))
default:
info.IP = c.route.url.String()
}
c.mu.Unlock()
}
c.mu.Unlock()
// Now let the known servers know about this new route
s.forwardNewRouteInfoToKnownServers(info)
}
@@ -637,11 +641,26 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) {
// Check for TLSConfig
tlsReq := opts.Cluster.TLSConfig != nil
// Configure Cluster Advertise Address
host := opts.Cluster.Host
port = l.Addr().(*net.TCPAddr).Port
ip := ""
if opts.Cluster.RouteAdvertise != "" {
advHost, advPort, err := parseHostPort(opts.Cluster.RouteAdvertise, strconv.Itoa(port))
if err != nil {
s.Errorf("setting RouteAdvertise failed %v", err)
} else {
host = advHost
port = advPort
}
ip = fmt.Sprintf("nats-route://%s/", net.JoinHostPort(advHost, strconv.Itoa(advPort)))
}
info := Info{
ID: s.info.ID,
Version: s.info.Version,
Host: opts.Cluster.Host,
Port: l.Addr().(*net.TCPAddr).Port,
Host: host,
Port: port,
IP: ip,
AuthRequired: false,
TLSRequired: tlsReq,
SSLRequired: tlsReq,

View File

@@ -53,6 +53,40 @@ func TestRouteConfig(t *testing.T) {
}
}
func TestRouteAdvertise(t *testing.T) {
// TODO: Need to work through this test case. may need to add a util proxy server
// to validate functionally.
optsSeed, _ := ProcessConfigFile("./configs/seed.conf")
optsSeed.NoSigs, optsSeed.NoLog = true, false
optsSeed.Debug = true
srvSeed := RunServer(optsSeed)
defer srvSeed.Shutdown()
seedRouteUrl := fmt.Sprintf("nats://%s:%d", optsSeed.Cluster.Host,
srvSeed.ClusterAddr().Port)
optsA := nextServerOpts(optsSeed)
optsA.Routes = RoutesFromStr(seedRouteUrl)
optsA.Cluster.Port = 9999
optsA.Cluster.RouteAdvertise = "example.com:80"
srvA := RunServer(optsA)
defer srvA.Shutdown()
if srvA.routeInfo.Host != "example.com" {
t.Fatalf("Expected srvA Route Advertise to be example.com:80, got: %v:%d",
srvA.routeInfo.Host, srvA.routeInfo.Port)
}
// using example.com, but don't expect anything to try to connect to it.
if srvA.routeInfo.IP != "nats-route://example.com:80/" {
t.Fatalf("Expected srvA.routeInfo.IP to be set, got %v", srvA.routeInfo.IP)
}
if srvSeed.routeInfo.IP != "" {
t.Fatalf("Expected srvSeed.routeInfo.IP to not be set, got %v", srvSeed.routeInfo.IP)
}
}
func TestServerRoutesWithClients(t *testing.T) {
optsA, _ := ProcessConfigFile("./configs/srv_a.conf")
optsB, _ := ProcessConfigFile("./configs/srv_b.conf")

View File

@@ -191,6 +191,7 @@ func (s *Server) generateRouteInfoJSON() {
return
}
s.routeInfoJSON = []byte(fmt.Sprintf(InfoProto, b))
s.Errorf("route info: %s", b)
}
// PrintAndDie is exported for access in other packages.
@@ -992,6 +993,7 @@ func (s *Server) startGoRoutine(f func()) {
// getClientConnectURLs returns suitable URLs for clients to connect to the listen
// port based on the server options' Host and Port. If the Host corresponds to
// "any" interfaces, this call returns the list of resolved IP addresses.
// If ClientAdvertise is set, returns the client advertise host and port
func (s *Server) getClientConnectURLs() []string {
// Snapshot server options.
opts := s.getOpts()
@@ -1002,34 +1004,16 @@ func (s *Server) getClientConnectURLs() []string {
sPort := strconv.Itoa(opts.Port)
urls := make([]string, 0, 1)
// short circuit if cluster-advertise is set
if opts.Cluster.ClientAdvertiseStr != "" {
hosts := strings.Split(opts.Cluster.ClientAdvertiseStr, ",")
for n, i := range hosts {
host, port, err := net.SplitHostPort(i)
switch err.(type) {
case *net.AddrError:
// try appending the current port
host, port, err = net.SplitHostPort(i + ":" + sPort)
}
if err != nil {
s.Fatalf("Client Advertise Address error: %v, on entry: %s", err, i)
}
// set the info host to the first address in a list
if n == 0 {
s.info.Host = host
s.info.Port, err = strconv.Atoi(port)
}
if err != nil {
s.Fatalf("Client Advertise Address error: %v, on entry: %s", err, i)
}
urls = append(urls, net.JoinHostPort(strings.TrimSpace(host), strings.TrimSpace(port)))
// short circuit if client advertise is set
ca := opts.ClientAdvertise
if ca != "" {
host, port, err := parseHostPort(ca, sPort)
s.info.Host = host // TODO: should not set these here.
s.info.Port = port
if err != nil {
s.Errorf("Client Advertise Address %v, on: %s", err, ca)
}
urls = append(urls, net.JoinHostPort(host, strconv.Itoa(port)))
} else {
ipAddr, err := net.ResolveIPAddr("ip", opts.Host)
// If the host is "any" (0.0.0.0 or ::), get specific IPs from available

View File

@@ -213,50 +213,42 @@ func TestGetConnectURLs(t *testing.T) {
}
}
func TestClusterClientAdvertiseConnectURL(t *testing.T) {
func TestClientAdvertiseConnectURL(t *testing.T) {
opts := DefaultOptions()
opts.Port = 4222
opts.Cluster.ClientAdvertiseStr = "nats.example.com"
opts.ClientAdvertise = "nats.example.com"
s := New(opts)
defer s.Shutdown()
urls := s.getClientConnectURLs()
if len(urls) != 1 {
t.Fatalf("Expected to get one url, got none: %v with Cluster.AdvertiseStr %v",
opts.Host, opts.Cluster.ClientAdvertiseStr)
t.Fatalf("Expected to get one url, got none: %v with ClientAdvertise %v",
opts.Host, opts.ClientAdvertise)
}
if urls[0] != "nats.example.com:4222" {
t.Fatalf("Expected to get '%s', got: '%v'", "nats.example.com:4222", urls[0])
}
s.Shutdown()
opts.Cluster.ClientAdvertiseStr = "nats.example.com, nats2.example.com:7777"
opts.ClientAdvertise = "nats.example.com:7777"
s = New(opts)
urls = s.getClientConnectURLs()
if len(urls) != 2 {
t.Fatalf("Expected to get two urls, got %d: %v", len(urls), opts.Cluster.ClientAdvertiseStr)
if len(urls) != 1 {
t.Fatalf("Expected to get one url, got none: %v with ClientAdvertise %v",
opts.Host, opts.ClientAdvertise)
}
if urls[0] != "nats.example.com:4222" {
t.Fatalf("Expected 'nats.example.com:4222', got: '%v'", urls[0])
if urls[0] != "nats.example.com:7777" {
t.Fatalf("Expected 'nats.example.com:7777', got: '%v'", urls[0])
}
if urls[1] != "nats2.example.com:7777" {
t.Fatalf("Expected 'nats2.example.com:7777', got: '%v'", urls[1])
if s.info.Host != "nats.example.com" {
t.Fatalf("Expected host to be set to nats.example.com")
}
if s.info.Port != 7777 {
t.Fatalf("Expected port to be set to 7777")
}
s.Shutdown()
}
func TestClusterAdvertiseConnectURL(t *testing.T) {
}
func TestNoDeadlockOnStartFailure(t *testing.T) {
opts := DefaultOptions()
opts.Host = "x.x.x.x" // bad host

View File

@@ -3,6 +3,10 @@
package server
import (
"errors"
"net"
"strconv"
"strings"
"time"
"github.com/nats-io/nuid"
@@ -68,3 +72,24 @@ func secondsToDuration(seconds float64) time.Duration {
ttl := seconds * float64(time.Second)
return time.Duration(ttl)
}
// Parse a host/port string with an optional default port
func parseHostPort(hostPort string, defaultPort string) (host string, port int, err error) {
if hostPort != "" {
host, sPort, err := net.SplitHostPort(hostPort)
switch err.(type) {
case *net.AddrError:
// try appending the current port
host, sPort, err = net.SplitHostPort(hostPort + ":" + defaultPort)
}
if err != nil {
return "", -1, err
}
port, err = strconv.Atoi(strings.TrimSpace(sPort))
if err != nil {
return "", -1, err
}
return strings.TrimSpace(host), port, nil
}
return "", -1, errors.New("No hostport specified")
}