diff --git a/server/client.go b/server/client.go index 8f3e19b6..3b86af09 100644 --- a/server/client.go +++ b/server/client.go @@ -206,6 +206,7 @@ const ( DuplicateClientID DuplicateServerName MinimumVersionRequired + ClusterNamesIdentical ) // Some flags passed to processMsgResults diff --git a/server/client_test.go b/server/client_test.go index 8a5aa6af..66312fc9 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -2526,6 +2526,7 @@ func TestClientClampMaxSubsErrReport(t *testing.T) { s1.SetLogger(l, false, false) o2 := DefaultOptions() + o2.Cluster.Name = "xyz" u, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", o1.LeafNode.Port)) o2.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{u}}} s2 := RunServer(o2) diff --git a/server/errors.go b/server/errors.go index 56b5af3f..91dbcfbd 100644 --- a/server/errors.go +++ b/server/errors.go @@ -78,6 +78,10 @@ var ( // attempted to connect to the leaf node listen port. ErrClientConnectedToLeafNodePort = errors.New("attempted to connect to leaf node port") + // ErrLeafNodeHasSameClusterName represents an error condition when a leafnode is a cluster + // and it has the same cluster name as the hub cluster. + ErrLeafNodeHasSameClusterName = errors.New("remote leafnode has same cluster name") + // ErrConnectedToWrongPort represents an error condition when a connection is attempted // to the wrong listen port (for instance a LeafNode to a client port, etc...) ErrConnectedToWrongPort = errors.New("attempted to connect to wrong port") diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 8082e6a8..7609a67f 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -11194,7 +11194,7 @@ func TestJetStreamClusterConsumerAndStreamNamesWithPathSeparators(t *testing.T) } func TestJetStreamClusterFilteredMirrors(t *testing.T) { - c := createJetStreamClusterExplicit(t, "MSR", 5) + c := createJetStreamClusterExplicit(t, "MSR", 3) defer c.shutdown() // Client for API requests. @@ -11250,5 +11250,22 @@ func TestJetStreamClusterFilteredMirrors(t *testing.T) { require_True(t, meta.Sequence.Stream == sseq) sseq += 3 } - +} + +// Test for making sure we error on same cluster name. +func TestJetStreamClusterSameClusterLeafNodes(t *testing.T) { + c := createJetStreamCluster(t, jsClusterAccountsTempl, "SAME", _EMPTY_, 3, 11233, true) + defer c.shutdown() + + // Do by hand since by default we check for connections. + tmpl := c.createLeafSolicit(jsClusterTemplWithLeafNode) + lc := createJetStreamCluster(t, tmpl, "SAME", "S-", 2, 22111, false) + defer lc.shutdown() + + time.Sleep(200 * time.Millisecond) + + // Make sure no leafnodes are connected. + for _, s := range lc.servers { + checkLeafNodeConnectedCount(t, s, 0) + } } diff --git a/server/jetstream_helpers_test.go b/server/jetstream_helpers_test.go index 6ff138b7..fd87e08b 100644 --- a/server/jetstream_helpers_test.go +++ b/server/jetstream_helpers_test.go @@ -410,16 +410,22 @@ func createJetStreamSuperClusterWithTemplateAndModHook(t *testing.T, tmpl string } func (sc *supercluster) createLeafNodes(clusterName string, numServers int) *cluster { + sc.t.Helper() + // Create our leafnode cluster template first. return sc.createLeafNodesWithDomain(clusterName, numServers, "") } func (sc *supercluster) createLeafNodesWithDomain(clusterName string, numServers int, domain string) *cluster { + sc.t.Helper() + // Create our leafnode cluster template first. return sc.randomCluster().createLeafNodes(clusterName, numServers, domain) } func (sc *supercluster) createSingleLeafNode(extend bool) *Server { + sc.t.Helper() + return sc.randomCluster().createLeafNode(extend) } @@ -828,14 +834,17 @@ var jsLeafFrag = ` ` func (c *cluster) createLeafNodes(clusterName string, numServers int, domain string) *cluster { + c.t.Helper() return c.createLeafNodesWithStartPortAndDomain(clusterName, numServers, 22111, domain) } func (c *cluster) createLeafNodesNoJS(clusterName string, numServers int) *cluster { + c.t.Helper() return c.createLeafNodesWithTemplateAndStartPort(jsClusterTemplWithLeafNodeNoJS, clusterName, numServers, 21333) } func (c *cluster) createLeafNodesWithStartPortAndDomain(clusterName string, numServers int, portStart int, domain string) *cluster { + c.t.Helper() if domain == _EMPTY_ { return c.createLeafNodesWithTemplateAndStartPort(jsClusterTemplWithLeafNode, clusterName, numServers, portStart) } @@ -844,6 +853,7 @@ func (c *cluster) createLeafNodesWithStartPortAndDomain(clusterName string, numS } func (c *cluster) createLeafNode(extend bool) *Server { + c.t.Helper() if extend { return c.createLeafNodeWithTemplate("LNS", strings.ReplaceAll(jsClusterTemplWithSingleLeafNode, "store_dir:", " extension_hint: will_extend, store_dir:")) @@ -853,6 +863,7 @@ func (c *cluster) createLeafNode(extend bool) *Server { } func (c *cluster) createLeafNodeWithTemplate(name, template string) *Server { + c.t.Helper() tmpl := c.createLeafSolicit(template) conf := fmt.Sprintf(tmpl, name, createDir(c.t, JetStreamStoreDir)) s, o := RunServerWithConfig(createConfFile(c.t, []byte(conf))) @@ -863,6 +874,8 @@ func (c *cluster) createLeafNodeWithTemplate(name, template string) *Server { // Helper to generate the leaf solicit configs. func (c *cluster) createLeafSolicit(tmpl string) string { + c.t.Helper() + // Create our leafnode cluster template first. var lns, lnss []string for _, s := range c.servers { @@ -880,6 +893,8 @@ func (c *cluster) createLeafSolicit(tmpl string) string { } func (c *cluster) createLeafNodesWithTemplateMixedMode(template, clusterName string, numJsServers, numNonServers int, doJSConfig bool) *cluster { + c.t.Helper() + // Create our leafnode cluster template first. tmpl := c.createLeafSolicit(template) pre := clusterName + "-" @@ -894,6 +909,8 @@ func (c *cluster) createLeafNodesWithTemplateMixedMode(template, clusterName str } func (c *cluster) createLeafNodesWithTemplateAndStartPort(template, clusterName string, numServers int, portStart int) *cluster { + c.t.Helper() + // Create our leafnode cluster template first. tmpl := c.createLeafSolicit(template) pre := clusterName + "-" @@ -906,6 +923,8 @@ func (c *cluster) createLeafNodesWithTemplateAndStartPort(template, clusterName // Will add in the mapping for the account to each server. func (c *cluster) addSubjectMapping(account, src, dest string) { + c.t.Helper() + for _, s := range c.servers { if s.ClusterName() != c.name { continue diff --git a/server/leafnode.go b/server/leafnode.go index 78adef19..c3e2dfd3 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -1,4 +1,4 @@ -// Copyright 2019-2021 The NATS Authors +// Copyright 2019-2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -49,6 +49,9 @@ const leafNodeReconnectDelayAfterLoopDetected = 30 * time.Second // connection is closed and it won't attempt to reconnect for that long. const leafNodeReconnectAfterPermViolation = 30 * time.Second +// When we have the same cluster name as the hub. +const leafNodeReconnectDelayAfterClusterNameSame = 30 * time.Second + // Prefix for loop detection subject const leafNodeLoopDetectionSubjectPrefix = "$LDS." @@ -1382,6 +1385,13 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro return err } + // Check for cluster name collisions. + if cn := s.cachedClusterName(); cn != _EMPTY_ && proto.Cluster != _EMPTY_ && proto.Cluster == cn { + c.sendErrAndErr(ErrLeafNodeHasSameClusterName.Error()) + c.closeConnection(ClusterNamesIdentical) + return ErrLeafNodeHasSameClusterName + } + // Reject if this has Gateway which means that it would be from a gateway // connection that incorrectly connects to the leafnode port. if proto.Gateway != _EMPTY_ { @@ -2269,6 +2279,13 @@ func (c *client) leafPermViolation(pub bool, subj []byte) { // Invoked from generic processErr() for LEAF connections. func (c *client) leafProcessErr(errStr string) { + // Check if we got a cluster name collision. + if strings.Contains(errStr, ErrLeafNodeHasSameClusterName.Error()) { + _, delay := c.setLeafConnectDelayIfSoliciting(leafNodeReconnectDelayAfterClusterNameSame) + c.Errorf("Leafnode connection dropped with same cluster name error. Delaying attempt to reconnect for %v", delay) + return + } + // We will look for Loop detected error coming from the other side. // If we solicit, set the connect delay. if !strings.Contains(errStr, "Loop detected") { diff --git a/server/leafnode_test.go b/server/leafnode_test.go index 33519bd1..19c62e50 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -360,6 +360,7 @@ func TestLeafNodeAccountNotFound(t *testing.T) { u, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", ob.LeafNode.Port)) oa := DefaultOptions() + oa.Cluster.Name = "xyz" oa.LeafNode.ReconnectInterval = 10 * time.Millisecond oa.LeafNode.Remotes = []*RemoteLeafOpts{ { @@ -498,6 +499,7 @@ func TestLeafNodeRTT(t *testing.T) { lnBURL, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", ob.LeafNode.Port)) oa := DefaultOptions() + oa.Cluster.Name = "xyz" oa.PingInterval = 15 * time.Millisecond oa.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{lnBURL}}} sa := RunServer(oa) @@ -562,6 +564,7 @@ func TestLeafNodeRTT(t *testing.T) { // Now check that initial RTT is computed prior to first PingInterval // Get new options to avoid possible race changing the ping interval. ob = DefaultOptions() + ob.Cluster.Name = "xyz" ob.PingInterval = time.Minute ob.LeafNode.Host = "127.0.0.1" ob.LeafNode.Port = -1 @@ -857,6 +860,7 @@ func TestLeafNodeLoop(t *testing.T) { sa.SetLogger(l, false, false) ob := DefaultOptions() + ob.Cluster.Name = "xyz" ob.LeafNode.ReconnectInterval = 10 * time.Millisecond ob.LeafNode.Port = 5678 ua, _ := url.Parse("nats://127.0.0.1:1234") @@ -1262,6 +1266,7 @@ func TestLeafNodePermissions(t *testing.T) { u, _ := url.Parse(fmt.Sprintf("nats://%s:%d", lo1.LeafNode.Host, lo1.LeafNode.Port)) lo2 := DefaultOptions() + lo2.Cluster.Name = "xyz" lo2.LeafNode.ReconnectInterval = 5 * time.Millisecond lo2.LeafNode.connDelay = 100 * time.Millisecond lo2.LeafNode.Remotes = []*RemoteLeafOpts{ @@ -1405,6 +1410,7 @@ func TestLeafNodePermissionsConcurrentAccess(t *testing.T) { u, _ := url.Parse(fmt.Sprintf("nats://%s:%d", lo1.LeafNode.Host, lo1.LeafNode.Port)) lo2 := DefaultOptions() + lo2.Cluster.Name = "xyz" lo2.LeafNode.ReconnectInterval = 5 * time.Millisecond lo2.LeafNode.connDelay = 500 * time.Millisecond lo2.LeafNode.Remotes = []*RemoteLeafOpts{ @@ -1674,6 +1680,7 @@ func TestLeafNodeTmpClients(t *testing.T) { // Check with normal leafnode connection that once connected, // the tmp map is also emptied. bo := DefaultOptions() + bo.Cluster.Name = "xyz" bo.LeafNode.ReconnectInterval = 5 * time.Millisecond u, err := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", ao.LeafNode.Port)) if err != nil { @@ -1729,6 +1736,8 @@ func TestLeafNodeTLSVerifyAndMap(t *testing.T) { defer s.Shutdown() slo := DefaultOptions() + slo.Cluster.Name = "xyz" + sltlsc := &tls.Config{} if test.provideCert { tc := &TLSConfigOpts{ @@ -1969,7 +1978,7 @@ func TestLeafNodeOriginClusterInfo(t *testing.T) { remotes [ { url: "nats://127.0.0.1:%d" } ] } cluster { - name: "abc" + name: "xyz" listen: "127.0.0.1:-1" } `, hopts.LeafNode.Port))) @@ -1987,8 +1996,8 @@ func TestLeafNodeOriginClusterInfo(t *testing.T) { checkLeafNodeConnected(t, s) l = grabLeaf() - if rc := l.remoteCluster(); rc != "abc" { - t.Fatalf("Expected a remote cluster name of \"abc\", got %q", rc) + if rc := l.remoteCluster(); rc != "xyz" { + t.Fatalf("Expected a remote cluster name of \"xyz\", got %q", rc) } pcid := l.cid @@ -2265,6 +2274,7 @@ func TestLeafNodeNoDuplicateWithinCluster(t *testing.T) { oLeaf1 := DefaultOptions() oLeaf1.ServerName = "leaf1" + oLeaf1.Cluster.Name = "xyz" oLeaf1.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{u}}} leaf1 := RunServer(oLeaf1) defer leaf1.Shutdown() @@ -2273,6 +2283,7 @@ func TestLeafNodeNoDuplicateWithinCluster(t *testing.T) { oLeaf2 := DefaultOptions() oLeaf2.ServerName = "leaf2" + oLeaf2.Cluster.Name = "xyz" oLeaf2.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{u}}} oLeaf2.Routes = RoutesFromStr(leaf1ClusterURL) leaf2 := RunServer(oLeaf2) @@ -2378,11 +2389,13 @@ func TestLeafNodeLMsgSplit(t *testing.T) { remoteLeafs := []*RemoteLeafOpts{{URLs: []*url.URL{u1, u2}}} oLeaf1 := DefaultOptions() + oLeaf1.Cluster.Name = "xyz" oLeaf1.LeafNode.Remotes = remoteLeafs leaf1 := RunServer(oLeaf1) defer leaf1.Shutdown() oLeaf2 := DefaultOptions() + oLeaf2.Cluster.Name = "xyz" oLeaf2.LeafNode.Remotes = remoteLeafs oLeaf2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", oLeaf1.Cluster.Port)) leaf2 := RunServer(oLeaf2) @@ -2475,6 +2488,7 @@ func TestLeafNodeRouteParseLSUnsub(t *testing.T) { remoteLeafs := []*RemoteLeafOpts{{URLs: []*url.URL{u2}}} oLeaf2 := DefaultOptions() + oLeaf2.Cluster.Name = "xyz" oLeaf2.LeafNode.Remotes = remoteLeafs leaf2 := RunServer(oLeaf2) defer leaf2.Shutdown() @@ -3296,6 +3310,7 @@ func TestLeafNodeStreamImport(t *testing.T) { o2 := DefaultOptions() o2.LeafNode.Port = -1 + o2.Cluster.Name = "xyz" accB := NewAccount("B") if err := accB.AddStreamExport(">", nil); err != nil { diff --git a/server/monitor.go b/server/monitor.go index 9c257911..77f7d765 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -2240,6 +2240,8 @@ func (reason ClosedState) String() string { return "Duplicate Server Name" case MinimumVersionRequired: return "Minimum Version Required" + case ClusterNamesIdentical: + return "Cluster Names Identical" } return "Unknown State" diff --git a/server/server_test.go b/server/server_test.go index 1ccdbb50..22900e64 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -1828,6 +1828,7 @@ func TestReconnectErrorReports(t *testing.T) { // Now try with leaf nodes csOpts.Cluster.Port = 0 + csOpts.Cluster.Name = _EMPTY_ csOpts.LeafNode.Host = "127.0.0.1" csOpts.LeafNode.Port = -1 @@ -1835,6 +1836,7 @@ func TestReconnectErrorReports(t *testing.T) { defer cs.Shutdown() opts.Cluster.Port = 0 + opts.Cluster.Name = _EMPTY_ opts.Routes = nil u, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", csOpts.LeafNode.Port)) opts.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{u}}}