mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-14 18:20:42 -07:00
Merge pull request #850 from nats-io/gw_fix_address_gossip
Resolve IP if gateway listen is 0.0.0.0 or ::
This commit is contained in:
@@ -40,7 +40,7 @@ var (
|
||||
|
||||
const (
|
||||
// VERSION is the current version for the server.
|
||||
VERSION = "2.0.0-beta.4"
|
||||
VERSION = "2.0.0-beta.5"
|
||||
|
||||
// PROTO is the currently supported protocol.
|
||||
// 0 was the original
|
||||
|
||||
@@ -399,6 +399,10 @@ func (s *Server) gatewayAcceptLoop(ch chan struct{}) {
|
||||
|
||||
// Similar to setInfoHostPortAndGenerateJSON, but for gatewayInfo.
|
||||
func (s *Server) setGatewayInfoHostPort(info *Info, o *Options) error {
|
||||
gw := s.gateway
|
||||
gw.Lock()
|
||||
defer gw.Unlock()
|
||||
delete(gw.URLs, gw.URL)
|
||||
if o.Gateway.Advertise != "" {
|
||||
advHost, advPort, err := parseHostPort(o.Gateway.Advertise, o.Gateway.Port)
|
||||
if err != nil {
|
||||
@@ -409,17 +413,22 @@ func (s *Server) setGatewayInfoHostPort(info *Info, o *Options) error {
|
||||
} else {
|
||||
info.Host = o.Gateway.Host
|
||||
info.Port = o.Gateway.Port
|
||||
// If the host is "0.0.0.0" or "::" we need to resolve to a public IP.
|
||||
// This will return at most 1 IP.
|
||||
ips, err := s.resolveIfHostIsAnyAddress(info.Host, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(ips) > 0 {
|
||||
info.Host = ips[0]
|
||||
}
|
||||
}
|
||||
gw := s.gateway
|
||||
gw.Lock()
|
||||
delete(gw.URLs, gw.URL)
|
||||
gw.URL = net.JoinHostPort(info.Host, strconv.Itoa(info.Port))
|
||||
gw.URLs[gw.URL] = struct{}{}
|
||||
gw.info = info
|
||||
info.GatewayURL = gw.URL
|
||||
// (re)generate the gatewayInfoJSON byte array
|
||||
gw.generateInfoJSON()
|
||||
gw.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -535,6 +535,107 @@ func TestGatewayListenError(t *testing.T) {
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestGatewayWithListenToAny(t *testing.T) {
|
||||
confB1 := createConfFile(t, []byte(`
|
||||
listen: "127.0.0.1:-1"
|
||||
cluster {
|
||||
listen: "127.0.0.1:-1"
|
||||
}
|
||||
gateway {
|
||||
name: "B"
|
||||
listen: "0.0.0.0:-1"
|
||||
}
|
||||
`))
|
||||
sb1, ob1 := RunServerWithConfig(confB1)
|
||||
defer sb1.Shutdown()
|
||||
|
||||
confB2 := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
listen: "127.0.0.1:-1"
|
||||
cluster {
|
||||
listen: "127.0.0.1:-1"
|
||||
routes: ["%s"]
|
||||
}
|
||||
gateway {
|
||||
name: "B"
|
||||
listen: "0.0.0.0:-1"
|
||||
}
|
||||
`, fmt.Sprintf("nats://127.0.0.1:%d", sb1.ClusterAddr().Port))))
|
||||
sb2, ob2 := RunServerWithConfig(confB2)
|
||||
defer sb2.Shutdown()
|
||||
|
||||
checkClusterFormed(t, sb1, sb2)
|
||||
|
||||
confA := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
listen: "127.0.0.1:-1"
|
||||
cluster {
|
||||
listen: "127.0.0.1:-1"
|
||||
}
|
||||
gateway {
|
||||
name: "A"
|
||||
listen: "0.0.0.0:-1"
|
||||
gateways [
|
||||
{
|
||||
name: "B"
|
||||
urls: ["%s", "%s"]
|
||||
}
|
||||
]
|
||||
}
|
||||
`, fmt.Sprintf("nats://127.0.0.1:%d", ob1.Gateway.Port), fmt.Sprintf("nats://127.0.0.1:%d", ob2.Gateway.Port))))
|
||||
oa := LoadConfig(confA)
|
||||
oa.gatewaysSolicitDelay = 15 * time.Millisecond
|
||||
sa := runGatewayServer(oa)
|
||||
defer sa.Shutdown()
|
||||
|
||||
waitForOutboundGateways(t, sa, 1, time.Second)
|
||||
waitForOutboundGateways(t, sb1, 1, time.Second)
|
||||
waitForOutboundGateways(t, sb2, 1, time.Second)
|
||||
waitForInboundGateways(t, sa, 2, time.Second)
|
||||
|
||||
checkAll := func(t *testing.T) {
|
||||
t.Helper()
|
||||
checkURL := func(t *testing.T, s *Server) {
|
||||
t.Helper()
|
||||
url := s.getGatewayURL()
|
||||
if strings.HasPrefix(url, "0.0.0.0") {
|
||||
t.Fatalf("URL still references 0.0.0.0")
|
||||
}
|
||||
s.gateway.RLock()
|
||||
for url := range s.gateway.URLs {
|
||||
if strings.HasPrefix(url, "0.0.0.0") {
|
||||
s.gateway.RUnlock()
|
||||
t.Fatalf("URL still references 0.0.0.0")
|
||||
}
|
||||
}
|
||||
s.gateway.RUnlock()
|
||||
|
||||
var cfg *gatewayCfg
|
||||
if s.getGatewayName() == "A" {
|
||||
cfg = s.getRemoteGateway("B")
|
||||
} else {
|
||||
cfg = s.getRemoteGateway("A")
|
||||
}
|
||||
urls := cfg.getURLs()
|
||||
for _, url := range urls {
|
||||
if strings.HasPrefix(url.Host, "0.0.0.0") {
|
||||
t.Fatalf("URL still references 0.0.0.0")
|
||||
}
|
||||
}
|
||||
}
|
||||
checkURL(t, sb1)
|
||||
checkURL(t, sb2)
|
||||
checkURL(t, sa)
|
||||
}
|
||||
checkAll(t)
|
||||
// Perform a reload and ensure that nothing has changed
|
||||
servers := []*Server{sb1, sb2, sa}
|
||||
for _, s := range servers {
|
||||
if err := s.Reload(); err != nil {
|
||||
t.Fatalf("Error on reload: %v", err)
|
||||
}
|
||||
checkAll(t)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGatewayAdvertise(t *testing.T) {
|
||||
o3 := testDefaultOptionsForGateway("C")
|
||||
s3 := runGatewayServer(o3)
|
||||
@@ -1215,6 +1316,20 @@ func TestGatewayAccountInterest(t *testing.T) {
|
||||
}
|
||||
return nil
|
||||
})
|
||||
checkFor(t, time.Second, 15*time.Millisecond, func() error {
|
||||
ei, inMap := gwcc.gw.outsim.Load("$foo")
|
||||
if !inMap {
|
||||
return fmt.Errorf("Did not receive subject no interest")
|
||||
}
|
||||
e := ei.(*outsie)
|
||||
e.RLock()
|
||||
_, inMap = e.ni["foo"]
|
||||
e.RUnlock()
|
||||
if !inMap {
|
||||
return fmt.Errorf("Did not receive subject no interest")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
// Second send should not go through to B
|
||||
natsPub(t, nc, "foo", []byte("hello"))
|
||||
natsFlush(t, nc)
|
||||
|
||||
@@ -1811,29 +1811,9 @@ func (s *Server) getClientConnectURLs() []string {
|
||||
urls = append(urls, net.JoinHostPort(s.info.Host, strconv.Itoa(s.info.Port)))
|
||||
} else {
|
||||
sPort := strconv.Itoa(opts.Port)
|
||||
ipAddr, err := net.ResolveIPAddr("ip", opts.Host)
|
||||
// If the host is "any" (0.0.0.0 or ::), get specific IPs from available
|
||||
// interfaces.
|
||||
if err == nil && ipAddr.IP.IsUnspecified() {
|
||||
var ip net.IP
|
||||
ifaces, _ := net.Interfaces()
|
||||
for _, i := range ifaces {
|
||||
addrs, _ := i.Addrs()
|
||||
for _, addr := range addrs {
|
||||
switch v := addr.(type) {
|
||||
case *net.IPNet:
|
||||
ip = v.IP
|
||||
case *net.IPAddr:
|
||||
ip = v.IP
|
||||
}
|
||||
// Skip non global unicast addresses
|
||||
if !ip.IsGlobalUnicast() || ip.IsUnspecified() {
|
||||
ip = nil
|
||||
continue
|
||||
}
|
||||
urls = append(urls, net.JoinHostPort(ip.String(), sPort))
|
||||
}
|
||||
}
|
||||
ips, err := s.resolveIfHostIsAnyAddress(opts.Host, true)
|
||||
for _, ip := range ips {
|
||||
urls = append(urls, net.JoinHostPort(ip, sPort))
|
||||
}
|
||||
if err != nil || len(urls) == 0 {
|
||||
// We are here if s.opts.Host is not "0.0.0.0" nor "::", or if for some
|
||||
@@ -1849,10 +1829,48 @@ func (s *Server) getClientConnectURLs() []string {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return urls
|
||||
}
|
||||
|
||||
// Returns an array of non local IPs if the provided host is
|
||||
// 0.0.0.0 or ::. It returns the first resolved if `all` is
|
||||
// false.
|
||||
func (s *Server) resolveIfHostIsAnyAddress(host string, all bool) ([]string, error) {
|
||||
ipAddr, err := net.ResolveIPAddr("ip", host)
|
||||
if err != nil {
|
||||
s.Errorf("Error resolving host %q: %v", host, err)
|
||||
return nil, err
|
||||
}
|
||||
// If this is not 0.0.0.0 or :: we have nothing to do.
|
||||
if !ipAddr.IP.IsUnspecified() {
|
||||
return nil, nil
|
||||
}
|
||||
var ips []string
|
||||
var ip net.IP
|
||||
ifaces, _ := net.Interfaces()
|
||||
for _, i := range ifaces {
|
||||
addrs, _ := i.Addrs()
|
||||
for _, addr := range addrs {
|
||||
switch v := addr.(type) {
|
||||
case *net.IPNet:
|
||||
ip = v.IP
|
||||
case *net.IPAddr:
|
||||
ip = v.IP
|
||||
}
|
||||
// Skip non global unicast addresses
|
||||
if !ip.IsGlobalUnicast() || ip.IsUnspecified() {
|
||||
ip = nil
|
||||
continue
|
||||
}
|
||||
ips = append(ips, ip.String())
|
||||
if !all {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
return ips, nil
|
||||
}
|
||||
|
||||
// if the ip is not specified, attempt to resolve it
|
||||
func resolveHostPorts(addr net.Listener) []string {
|
||||
hostPorts := make([]string, 0)
|
||||
|
||||
Reference in New Issue
Block a user