From 515ca5e70fe4e99e189c31f000de9862986d6c49 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Tue, 9 Apr 2019 16:22:49 -0600 Subject: [PATCH] LeafNode: do hostname resolution and get random one from result This is similar to what we do with Gateways. Signed-off-by: Ivan Kozlovic --- server/leafnode.go | 39 +++++++++++++++++---- server/leafnode_test.go | 75 +++++++++++++++++++++++++++++++++++++++++ server/opts.go | 8 +++++ server/opts_test.go | 3 ++ server/server.go | 7 ++++ 5 files changed, 126 insertions(+), 6 deletions(-) create mode 100644 server/leafnode_test.go diff --git a/server/leafnode.go b/server/leafnode.go index 3b038b07..5a817232 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -91,9 +91,6 @@ func validateLeafNode(o *Options) error { func (s *Server) reConnectToRemoteLeafNode(remote *leafNodeCfg) { delay := s.getOpts().LeafNode.ReconnectInterval - if delay == 0 { - delay = DEFAULT_LEAF_NODE_RECONNECT - } select { case <-time.After(delay): case <-s.quitCh: @@ -137,6 +134,21 @@ func (cfg *leafNodeCfg) getCurrentURL() *url.URL { return cfg.curURL } +// Ensure that non-exported options (used in tests) have +// been properly set. +func (s *Server) setLeafNodeNonExportedOptions() { + opts := s.getOpts() + s.leafNodeOpts.dialTimeout = opts.LeafNode.dialTimeout + if s.leafNodeOpts.dialTimeout == 0 { + // Use same timeouts as routes for now. + s.leafNodeOpts.dialTimeout = DEFAULT_ROUTE_DIAL + } + s.leafNodeOpts.resolver = opts.LeafNode.resolver + if s.leafNodeOpts.resolver == nil { + s.leafNodeOpts.resolver = net.DefaultResolver + } +} + func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg) { defer s.grWG.Done() @@ -145,16 +157,31 @@ func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg) { return } + reconnectDelay := s.getOpts().LeafNode.ReconnectInterval + s.mu.Lock() + dialTimeout := s.leafNodeOpts.dialTimeout + resolver := s.leafNodeOpts.resolver + s.mu.Unlock() + + var conn net.Conn + for s.isRunning() && s.remoteLeafNodeStillValid(remote) { rURL := remote.pickNextURL() - s.Debugf("Trying to connect as leaf node to remote server on %s", rURL.Host) - conn, err := net.DialTimeout("tcp", rURL.Host, DEFAULT_ROUTE_DIAL) // Use same timeouts as routes for now. + url, err := s.getRandomIP(resolver, rURL.Host) + if err == nil { + var ipStr string + if url != rURL.Host { + ipStr = fmt.Sprintf(" (%s)", url) + } + s.Debugf("Trying to connect as leaf node to remote server on %s%s", rURL.Host, ipStr) + conn, err = net.DialTimeout("tcp", url, dialTimeout) + } if err != nil { s.Errorf("Error trying to connect as leaf node to remote server: %v", err) select { case <-s.quitCh: return - case <-time.After(DEFAULT_ROUTE_CONNECT): + case <-time.After(reconnectDelay): continue } } diff --git a/server/leafnode_test.go b/server/leafnode_test.go new file mode 100644 index 00000000..be954f83 --- /dev/null +++ b/server/leafnode_test.go @@ -0,0 +1,75 @@ +// Copyright 2019 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "fmt" + "net/url" + "strings" + "testing" + "time" +) + +type captureLeafNodeRandomIPLogger struct { + DummyLogger + ch chan struct{} + ips [3]int +} + +func (c *captureLeafNodeRandomIPLogger) Debugf(format string, v ...interface{}) { + msg := fmt.Sprintf(format, v...) + if strings.Contains(msg, "hostname_to_resolve") { + ippos := strings.Index(msg, "127.0.0.") + if ippos != -1 { + n := int(msg[ippos+8] - '1') + c.ips[n]++ + for _, v := range c.ips { + if v < 2 { + return + } + } + // All IPs got at least some hit, we are done. + c.ch <- struct{}{} + } + } +} + +func TestLeafNodeRandomIP(t *testing.T) { + u, err := url.Parse("nats://hostname_to_resolve:1234") + if err != nil { + t.Fatalf("Error parsing: %v", err) + } + + resolver := &myDummyDNSResolver{ips: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3"}} + + o := DefaultOptions() + o.Host = "127.0.0.1" + o.Port = -1 + o.LeafNode.Port = 0 + o.LeafNode.Remotes = []*RemoteLeafOpts{&RemoteLeafOpts{URL: u}} + o.LeafNode.ReconnectInterval = 50 * time.Millisecond + o.LeafNode.resolver = resolver + o.LeafNode.dialTimeout = 15 * time.Millisecond + s := RunServer(o) + defer s.Shutdown() + + l := &captureLeafNodeRandomIPLogger{ch: make(chan struct{})} + s.SetLogger(l, true, true) + + select { + case <-l.ch: + case <-time.After(3 * time.Second): + t.Fatalf("Does not seem to have used random IPs") + } +} diff --git a/server/opts.go b/server/opts.go index d01f8daa..1b5f3235 100644 --- a/server/opts.go +++ b/server/opts.go @@ -94,6 +94,10 @@ type LeafNodeOpts struct { Advertise string `json:"-"` NoAdvertise bool `json:"-"` ReconnectInterval time.Duration `json:"-"` + + // Not exported, for tests. + resolver netResolver + dialTimeout time.Duration } // RemoteLeafOpts are options for connecting to a remote server as a leaf node. @@ -2367,6 +2371,10 @@ func setBaselineOptions(opts *Options) { opts.LeafNode.AuthTimeout = float64(AUTH_TIMEOUT) / float64(time.Second) } } + // Set this regardless of opts.LeafNode.Port + if opts.LeafNode.ReconnectInterval == 0 { + opts.LeafNode.ReconnectInterval = DEFAULT_LEAF_NODE_RECONNECT + } if opts.MaxControlLine == 0 { opts.MaxControlLine = MAX_CONTROL_LINE_SIZE } diff --git a/server/opts_test.go b/server/opts_test.go index 8cc1841b..38b3358f 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -56,6 +56,9 @@ func TestDefaultOptions(t *testing.T) { WriteDeadline: DEFAULT_FLUSH_DEADLINE, MaxClosedClients: DEFAULT_MAX_CLOSED_CLIENTS, LameDuckDuration: DEFAULT_LAME_DUCK_DURATION, + LeafNode: LeafNodeOpts{ + ReconnectInterval: DEFAULT_LEAF_NODE_RECONNECT, + }, } opts := &Options{} diff --git a/server/server.go b/server/server.go index 980b59b7..ca2d54b2 100644 --- a/server/server.go +++ b/server/server.go @@ -121,6 +121,10 @@ type Server struct { leafNodeListener net.Listener leafNodeInfo Info leafNodeInfoJSON []byte + leafNodeOpts struct { + resolver netResolver + dialTimeout time.Duration + } quitCh chan struct{} @@ -235,6 +239,9 @@ func NewServer(opts *Options) (*Server, error) { s.mu.Lock() defer s.mu.Unlock() + // Ensure that non-exported options (used in tests) are properly set. + s.setLeafNodeNonExportedOptions() + // Used internally for quick look-ups. s.clientConnectURLsMap = make(map[string]struct{})