mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 11:48:43 -07:00
Merge pull request #936 from nats-io/leafnode_get_random_ip
LeafNode: do hostname resolution and get random one from result
This commit is contained in:
@@ -91,9 +91,6 @@ func validateLeafNode(o *Options) error {
|
||||
|
||||
func (s *Server) reConnectToRemoteLeafNode(remote *leafNodeCfg) {
|
||||
delay := s.getOpts().LeafNode.ReconnectInterval
|
||||
if delay == 0 {
|
||||
delay = DEFAULT_LEAF_NODE_RECONNECT
|
||||
}
|
||||
select {
|
||||
case <-time.After(delay):
|
||||
case <-s.quitCh:
|
||||
@@ -137,6 +134,21 @@ func (cfg *leafNodeCfg) getCurrentURL() *url.URL {
|
||||
return cfg.curURL
|
||||
}
|
||||
|
||||
// Ensure that non-exported options (used in tests) have
|
||||
// been properly set.
|
||||
func (s *Server) setLeafNodeNonExportedOptions() {
|
||||
opts := s.getOpts()
|
||||
s.leafNodeOpts.dialTimeout = opts.LeafNode.dialTimeout
|
||||
if s.leafNodeOpts.dialTimeout == 0 {
|
||||
// Use same timeouts as routes for now.
|
||||
s.leafNodeOpts.dialTimeout = DEFAULT_ROUTE_DIAL
|
||||
}
|
||||
s.leafNodeOpts.resolver = opts.LeafNode.resolver
|
||||
if s.leafNodeOpts.resolver == nil {
|
||||
s.leafNodeOpts.resolver = net.DefaultResolver
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg) {
|
||||
defer s.grWG.Done()
|
||||
|
||||
@@ -145,16 +157,31 @@ func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg) {
|
||||
return
|
||||
}
|
||||
|
||||
reconnectDelay := s.getOpts().LeafNode.ReconnectInterval
|
||||
s.mu.Lock()
|
||||
dialTimeout := s.leafNodeOpts.dialTimeout
|
||||
resolver := s.leafNodeOpts.resolver
|
||||
s.mu.Unlock()
|
||||
|
||||
var conn net.Conn
|
||||
|
||||
for s.isRunning() && s.remoteLeafNodeStillValid(remote) {
|
||||
rURL := remote.pickNextURL()
|
||||
s.Debugf("Trying to connect as leaf node to remote server on %s", rURL.Host)
|
||||
conn, err := net.DialTimeout("tcp", rURL.Host, DEFAULT_ROUTE_DIAL) // Use same timeouts as routes for now.
|
||||
url, err := s.getRandomIP(resolver, rURL.Host)
|
||||
if err == nil {
|
||||
var ipStr string
|
||||
if url != rURL.Host {
|
||||
ipStr = fmt.Sprintf(" (%s)", url)
|
||||
}
|
||||
s.Debugf("Trying to connect as leaf node to remote server on %s%s", rURL.Host, ipStr)
|
||||
conn, err = net.DialTimeout("tcp", url, dialTimeout)
|
||||
}
|
||||
if err != nil {
|
||||
s.Errorf("Error trying to connect as leaf node to remote server: %v", err)
|
||||
select {
|
||||
case <-s.quitCh:
|
||||
return
|
||||
case <-time.After(DEFAULT_ROUTE_CONNECT):
|
||||
case <-time.After(reconnectDelay):
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
75
server/leafnode_test.go
Normal file
75
server/leafnode_test.go
Normal file
@@ -0,0 +1,75 @@
|
||||
// Copyright 2019 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package server
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
type captureLeafNodeRandomIPLogger struct {
|
||||
DummyLogger
|
||||
ch chan struct{}
|
||||
ips [3]int
|
||||
}
|
||||
|
||||
func (c *captureLeafNodeRandomIPLogger) Debugf(format string, v ...interface{}) {
|
||||
msg := fmt.Sprintf(format, v...)
|
||||
if strings.Contains(msg, "hostname_to_resolve") {
|
||||
ippos := strings.Index(msg, "127.0.0.")
|
||||
if ippos != -1 {
|
||||
n := int(msg[ippos+8] - '1')
|
||||
c.ips[n]++
|
||||
for _, v := range c.ips {
|
||||
if v < 2 {
|
||||
return
|
||||
}
|
||||
}
|
||||
// All IPs got at least some hit, we are done.
|
||||
c.ch <- struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestLeafNodeRandomIP(t *testing.T) {
|
||||
u, err := url.Parse("nats://hostname_to_resolve:1234")
|
||||
if err != nil {
|
||||
t.Fatalf("Error parsing: %v", err)
|
||||
}
|
||||
|
||||
resolver := &myDummyDNSResolver{ips: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3"}}
|
||||
|
||||
o := DefaultOptions()
|
||||
o.Host = "127.0.0.1"
|
||||
o.Port = -1
|
||||
o.LeafNode.Port = 0
|
||||
o.LeafNode.Remotes = []*RemoteLeafOpts{&RemoteLeafOpts{URL: u}}
|
||||
o.LeafNode.ReconnectInterval = 50 * time.Millisecond
|
||||
o.LeafNode.resolver = resolver
|
||||
o.LeafNode.dialTimeout = 15 * time.Millisecond
|
||||
s := RunServer(o)
|
||||
defer s.Shutdown()
|
||||
|
||||
l := &captureLeafNodeRandomIPLogger{ch: make(chan struct{})}
|
||||
s.SetLogger(l, true, true)
|
||||
|
||||
select {
|
||||
case <-l.ch:
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatalf("Does not seem to have used random IPs")
|
||||
}
|
||||
}
|
||||
@@ -94,6 +94,10 @@ type LeafNodeOpts struct {
|
||||
Advertise string `json:"-"`
|
||||
NoAdvertise bool `json:"-"`
|
||||
ReconnectInterval time.Duration `json:"-"`
|
||||
|
||||
// Not exported, for tests.
|
||||
resolver netResolver
|
||||
dialTimeout time.Duration
|
||||
}
|
||||
|
||||
// RemoteLeafOpts are options for connecting to a remote server as a leaf node.
|
||||
@@ -2367,6 +2371,10 @@ func setBaselineOptions(opts *Options) {
|
||||
opts.LeafNode.AuthTimeout = float64(AUTH_TIMEOUT) / float64(time.Second)
|
||||
}
|
||||
}
|
||||
// Set this regardless of opts.LeafNode.Port
|
||||
if opts.LeafNode.ReconnectInterval == 0 {
|
||||
opts.LeafNode.ReconnectInterval = DEFAULT_LEAF_NODE_RECONNECT
|
||||
}
|
||||
if opts.MaxControlLine == 0 {
|
||||
opts.MaxControlLine = MAX_CONTROL_LINE_SIZE
|
||||
}
|
||||
|
||||
@@ -56,6 +56,9 @@ func TestDefaultOptions(t *testing.T) {
|
||||
WriteDeadline: DEFAULT_FLUSH_DEADLINE,
|
||||
MaxClosedClients: DEFAULT_MAX_CLOSED_CLIENTS,
|
||||
LameDuckDuration: DEFAULT_LAME_DUCK_DURATION,
|
||||
LeafNode: LeafNodeOpts{
|
||||
ReconnectInterval: DEFAULT_LEAF_NODE_RECONNECT,
|
||||
},
|
||||
}
|
||||
|
||||
opts := &Options{}
|
||||
|
||||
@@ -121,6 +121,10 @@ type Server struct {
|
||||
leafNodeListener net.Listener
|
||||
leafNodeInfo Info
|
||||
leafNodeInfoJSON []byte
|
||||
leafNodeOpts struct {
|
||||
resolver netResolver
|
||||
dialTimeout time.Duration
|
||||
}
|
||||
|
||||
quitCh chan struct{}
|
||||
|
||||
@@ -235,6 +239,9 @@ func NewServer(opts *Options) (*Server, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
// Ensure that non-exported options (used in tests) are properly set.
|
||||
s.setLeafNodeNonExportedOptions()
|
||||
|
||||
// Used internally for quick look-ups.
|
||||
s.clientConnectURLsMap = make(map[string]struct{})
|
||||
|
||||
|
||||
Reference in New Issue
Block a user