mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-14 10:10:42 -07:00
Merge pull request #1069 from nats-io/mremotes
Convert leafnode solicited remotes to an array
This commit is contained in:
@@ -24,6 +24,7 @@ import (
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/url"
|
||||
"reflect"
|
||||
"regexp"
|
||||
"runtime"
|
||||
"strconv"
|
||||
@@ -66,7 +67,7 @@ func (s *Server) solicitLeafNodeRemotes(remotes []*RemoteLeafOpts) {
|
||||
func (s *Server) remoteLeafNodeStillValid(remote *leafNodeCfg) bool {
|
||||
for _, ri := range s.getOpts().LeafNode.Remotes {
|
||||
// FIXME(dlc) - What about auth changes?
|
||||
if urlsAreEqual(ri.URL, remote.URL) {
|
||||
if reflect.DeepEqual(ri.URLs, remote.URLs) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
@@ -108,7 +109,7 @@ func newLeafNodeCfg(remote *RemoteLeafOpts) *leafNodeCfg {
|
||||
}
|
||||
// Start with the one that is configured. We will add to this
|
||||
// array when receiving async leafnode INFOs.
|
||||
cfg.urls = append(cfg.urls, cfg.URL)
|
||||
cfg.urls = append(cfg.urls, cfg.URLs...)
|
||||
return cfg
|
||||
}
|
||||
|
||||
@@ -152,7 +153,7 @@ func (s *Server) setLeafNodeNonExportedOptions() {
|
||||
func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg, firstConnect bool) {
|
||||
defer s.grWG.Done()
|
||||
|
||||
if remote == nil || remote.URL == nil {
|
||||
if remote == nil || len(remote.URLs) == 0 {
|
||||
s.Debugf("Empty remote leafnode definition, nothing to connect")
|
||||
return
|
||||
}
|
||||
@@ -206,7 +207,7 @@ func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg, firstConnect bool)
|
||||
// We will put this in the normal log if first connect, does not force -DV mode to know
|
||||
// that the connect worked.
|
||||
if firstConnect {
|
||||
s.Noticef("Connected leafnode to %q", remote.RemoteLeafOpts.URL.Hostname())
|
||||
s.Noticef("Connected leafnode to %q", rURL.Hostname())
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -343,7 +344,7 @@ func (c *client) sendLeafConnect(tlsRequired bool) {
|
||||
sig := base64.RawURLEncoding.EncodeToString(sigraw)
|
||||
cinfo.JWT = string(tmp)
|
||||
cinfo.Sig = sig
|
||||
} else if userInfo := c.leaf.remote.URL.User; userInfo != nil {
|
||||
} else if userInfo := c.leaf.remote.curURL.User; userInfo != nil {
|
||||
cinfo.User = userInfo.Username()
|
||||
pass, _ := userInfo.Password()
|
||||
cinfo.Pass = pass
|
||||
@@ -524,7 +525,7 @@ func (s *Server) createLeafNode(conn net.Conn, remote *leafNodeCfg) *client {
|
||||
// had this advertised to us an should use the configured host
|
||||
// name for the TLS server name.
|
||||
if net.ParseIP(host) != nil {
|
||||
host, _, _ = net.SplitHostPort(c.leaf.remote.RemoteLeafOpts.URL.Host)
|
||||
host, _, _ = net.SplitHostPort(c.leaf.remote.curURL.Host)
|
||||
}
|
||||
tlsConfig.ServerName = host
|
||||
|
||||
@@ -674,15 +675,20 @@ func (c *client) updateLeafNodeURLs(info *Info) {
|
||||
c.Errorf("Error parsing url %q: %v", surl, err)
|
||||
continue
|
||||
}
|
||||
// Do not add if it's the same than the one that
|
||||
// we have configured.
|
||||
if urlsAreEqual(url, cfg.URL) {
|
||||
continue
|
||||
// Do not add if it's the same as what we already have configured.
|
||||
var dup bool
|
||||
for _, u := range cfg.URLs {
|
||||
if urlsAreEqual(url, u) {
|
||||
dup = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !dup {
|
||||
cfg.urls = append(cfg.urls, url)
|
||||
}
|
||||
cfg.urls = append(cfg.urls, url)
|
||||
}
|
||||
// Add the configured one
|
||||
cfg.urls = append(cfg.urls, cfg.URL)
|
||||
cfg.urls = append(cfg.urls, cfg.URLs...)
|
||||
}
|
||||
|
||||
// Similar to setInfoHostPortAndGenerateJSON, but for leafNodeInfo.
|
||||
|
||||
@@ -59,7 +59,7 @@ func TestLeafNodeRandomIP(t *testing.T) {
|
||||
o.Host = "127.0.0.1"
|
||||
o.Port = -1
|
||||
o.LeafNode.Port = 0
|
||||
o.LeafNode.Remotes = []*RemoteLeafOpts{{URL: u}}
|
||||
o.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{u}}}
|
||||
o.LeafNode.ReconnectInterval = 50 * time.Millisecond
|
||||
o.LeafNode.resolver = resolver
|
||||
o.LeafNode.dialTimeout = 15 * time.Millisecond
|
||||
|
||||
@@ -922,9 +922,9 @@ type LeafNodeOptsVarz struct {
|
||||
|
||||
// RemoteLeafOptsVarz contains monitoring remote leaf node information
|
||||
type RemoteLeafOptsVarz struct {
|
||||
LocalAccount string `json:"local_account,omitempty"`
|
||||
URL string `json:"url,omitempty"`
|
||||
TLSTimeout float64 `json:"tls_timeout,omitempty"`
|
||||
LocalAccount string `json:"local_account,omitempty"`
|
||||
TLSTimeout float64 `json:"tls_timeout,omitempty"`
|
||||
URLs []string `json:"urls,omitempty"`
|
||||
}
|
||||
|
||||
// VarzOptions are the options passed to Varz().
|
||||
@@ -1067,7 +1067,7 @@ func (s *Server) createVarz(pcpu float64, rss int64) *Varz {
|
||||
for i, r := range ln.Remotes {
|
||||
rlna[i] = RemoteLeafOptsVarz{
|
||||
LocalAccount: r.LocalAccount,
|
||||
URL: r.URL.Host,
|
||||
URLs: urlsToStrings(r.URLs),
|
||||
TLSTimeout: r.TLSTimeout,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2492,7 +2492,7 @@ func TestMonitorLeafNode(t *testing.T) {
|
||||
opts.LeafNode.Remotes = []*RemoteLeafOpts{
|
||||
&RemoteLeafOpts{
|
||||
LocalAccount: "acc",
|
||||
URL: u,
|
||||
URLs: []*url.URL{u},
|
||||
TLSTimeout: 1,
|
||||
},
|
||||
}
|
||||
@@ -2506,9 +2506,7 @@ func TestMonitorLeafNode(t *testing.T) {
|
||||
opts.LeafNode.TLSTimeout,
|
||||
[]RemoteLeafOptsVarz{
|
||||
{
|
||||
"acc",
|
||||
"localhost:1234",
|
||||
1,
|
||||
"acc", 1, []string{"localhost:1234"},
|
||||
},
|
||||
},
|
||||
}
|
||||
@@ -2527,7 +2525,7 @@ func TestMonitorLeafNode(t *testing.T) {
|
||||
|
||||
// Having this here to make sure that if fields are added in ClusterOptsVarz,
|
||||
// we make sure to update this test (compiler will report an error if we don't)
|
||||
_ = LeafNodeOptsVarz{"", 0, 0, 0, []RemoteLeafOptsVarz{{"", "", 0}}}
|
||||
_ = LeafNodeOptsVarz{"", 0, 0, 0, []RemoteLeafOptsVarz{{"", 0, nil}}}
|
||||
|
||||
// Alter the fields to make sure that we have a proper deep copy
|
||||
// of what may be stored in the server. Anything we change here
|
||||
@@ -2537,7 +2535,7 @@ func TestMonitorLeafNode(t *testing.T) {
|
||||
v.LeafNode.AuthTimeout = 1234.5
|
||||
v.LeafNode.TLSTimeout = 1234.5
|
||||
v.LeafNode.Remotes[0].LocalAccount = "wrong"
|
||||
v.LeafNode.Remotes[0].URL = "wrong"
|
||||
v.LeafNode.Remotes[0].URLs = append(v.LeafNode.Remotes[0].URLs, "wrong")
|
||||
v.LeafNode.Remotes[0].TLSTimeout = 1234.5
|
||||
v = pollVarz(t, s, mode, varzURL, nil)
|
||||
check(t, v)
|
||||
|
||||
@@ -102,8 +102,6 @@ type RemoteGatewayOpts struct {
|
||||
}
|
||||
|
||||
// LeafNodeOpts are options for a given server to accept leaf node connections and/or connect to a remote cluster.
|
||||
// NOTE: This structure is no longer used for monitoring endpoints
|
||||
// and json tags are deprecated and may be removed in the future.
|
||||
type LeafNodeOpts struct {
|
||||
Host string `json:"addr,omitempty"`
|
||||
Port int `json:"port,omitempty"`
|
||||
@@ -124,11 +122,9 @@ type LeafNodeOpts struct {
|
||||
}
|
||||
|
||||
// RemoteLeafOpts are options for connecting to a remote server as a leaf node.
|
||||
// NOTE: This structure is no longer used for monitoring endpoints
|
||||
// and json tags are deprecated and may be removed in the future.
|
||||
type RemoteLeafOpts struct {
|
||||
LocalAccount string `json:"local_account,omitempty"`
|
||||
URL *url.URL `json:"url,omitempty"`
|
||||
URLs []*url.URL `json:"urls,omitempty"`
|
||||
Credentials string `json:"-"`
|
||||
TLS bool `json:"-"`
|
||||
TLSConfig *tls.Config `json:"-"`
|
||||
@@ -1085,7 +1081,14 @@ func parseRemoteLeafNodes(v interface{}, errors *[]error, warnings *[]error) ([]
|
||||
*errors = append(*errors, &configErr{tk, err.Error()})
|
||||
continue
|
||||
}
|
||||
remote.URL = url
|
||||
remote.URLs = append(remote.URLs, url)
|
||||
case "urls":
|
||||
urls, errs := parseURLs(v.([]interface{}), "leafnode")
|
||||
if errs != nil {
|
||||
*errors = append(*errors, errs...)
|
||||
continue
|
||||
}
|
||||
remote.URLs = urls
|
||||
case "account", "local":
|
||||
remote.LocalAccount = v.(string)
|
||||
case "creds", "credentials":
|
||||
@@ -2437,8 +2440,12 @@ func setBaselineOptions(opts *Options) {
|
||||
}
|
||||
// Set baseline connect port for remotes.
|
||||
for _, r := range opts.LeafNode.Remotes {
|
||||
if r != nil && r.URL.Port() == "" {
|
||||
r.URL.Host = net.JoinHostPort(r.URL.Host, strconv.Itoa(DEFAULT_LEAFNODE_PORT))
|
||||
if r != nil {
|
||||
for _, u := range r.URLs {
|
||||
if u.Port() == "" {
|
||||
u.Host = net.JoinHostPort(u.Host, strconv.Itoa(DEFAULT_LEAFNODE_PORT))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1833,7 +1833,8 @@ func TestParsingLeafNodeRemotes(t *testing.T) {
|
||||
LocalAccount: "foobar",
|
||||
Credentials: "./my.creds",
|
||||
}
|
||||
expected.URL, _ = url.Parse("nats-leaf://127.0.0.1:2222")
|
||||
u, _ := url.Parse("nats-leaf://127.0.0.1:2222")
|
||||
expected.URLs = append(expected.URLs, u)
|
||||
if !reflect.DeepEqual(opts.LeafNode.Remotes[0], expected) {
|
||||
t.Fatalf("Expected %v, got %v", expected, opts.LeafNode.Remotes[0])
|
||||
}
|
||||
|
||||
@@ -1217,7 +1217,7 @@ func TestConnectErrorReports(t *testing.T) {
|
||||
// Now try with leaf nodes
|
||||
opts.Cluster.Port = 0
|
||||
opts.Routes = nil
|
||||
opts.LeafNode.Remotes = []*RemoteLeafOpts{&RemoteLeafOpts{URL: remoteURLs[0]}}
|
||||
opts.LeafNode.Remotes = []*RemoteLeafOpts{&RemoteLeafOpts{URLs: []*url.URL{remoteURLs[0]}}}
|
||||
opts.LeafNode.ReconnectInterval = 15 * time.Millisecond
|
||||
s = RunServer(opts)
|
||||
defer s.Shutdown()
|
||||
@@ -1393,7 +1393,7 @@ func TestReconnectErrorReports(t *testing.T) {
|
||||
opts.Cluster.Port = 0
|
||||
opts.Routes = nil
|
||||
u, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", csOpts.LeafNode.Port))
|
||||
opts.LeafNode.Remotes = []*RemoteLeafOpts{&RemoteLeafOpts{URL: u}}
|
||||
opts.LeafNode.Remotes = []*RemoteLeafOpts{&RemoteLeafOpts{URLs: []*url.URL{u}}}
|
||||
opts.LeafNode.ReconnectInterval = 15 * time.Millisecond
|
||||
s = RunServer(opts)
|
||||
defer s.Shutdown()
|
||||
|
||||
@@ -67,7 +67,7 @@ func runSolicitLeafServerToURL(surl string) (*server.Server, *server.Options) {
|
||||
o.Host = "127.0.0.1"
|
||||
o.Port = -1
|
||||
rurl, _ := url.Parse(surl)
|
||||
o.LeafNode.Remotes = []*server.RemoteLeafOpts{{URL: rurl}}
|
||||
o.LeafNode.Remotes = []*server.RemoteLeafOpts{{URLs: []*url.URL{rurl}}}
|
||||
o.LeafNode.ReconnectInterval = 100 * time.Millisecond
|
||||
return RunServer(&o), &o
|
||||
}
|
||||
@@ -931,7 +931,7 @@ func runTLSSolicitLeafServer(lso *server.Options) (*server.Server, *server.Optio
|
||||
o.Host = "127.0.0.1"
|
||||
o.Port = -1
|
||||
rurl, _ := url.Parse(fmt.Sprintf("nats-leaf://%s:%d", lso.LeafNode.Host, lso.LeafNode.Port))
|
||||
remote := &server.RemoteLeafOpts{URL: rurl}
|
||||
remote := &server.RemoteLeafOpts{URLs: []*url.URL{rurl}}
|
||||
remote.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12}
|
||||
host, _, _ := net.SplitHostPort(lso.LeafNode.Host)
|
||||
remote.TLSConfig.ServerName = host
|
||||
@@ -2458,3 +2458,27 @@ func TestLeafNodesStaggeredSubPub(t *testing.T) {
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func TestLeafNodeMultipleRemoteURLs(t *testing.T) {
|
||||
s, opts := runLeafServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
content := `
|
||||
port: -1
|
||||
leafnodes {
|
||||
remotes = [
|
||||
{
|
||||
urls: [nats-leaf://127.0.0.1:%d,nats-leaf://localhost:%d]
|
||||
}
|
||||
]
|
||||
}
|
||||
`
|
||||
|
||||
config := fmt.Sprintf(content, opts.LeafNode.Port, opts.LeafNode.Port)
|
||||
conf := createConfFile(t, []byte(config))
|
||||
sl, _ := RunServerWithConfig(conf)
|
||||
defer os.Remove(conf)
|
||||
defer sl.Shutdown()
|
||||
|
||||
checkLeafNodeConnected(t, s)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user