From 63c750e295945af9ea71bdeb3ae81e1fddf54cea Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Tue, 15 Mar 2022 15:00:13 -0600 Subject: [PATCH] [CHANGED] Gateway: Detect duplicate names between clusters Gateway connection will be closed and error reported if a remote has a name that is a duplicate of the local cluster. Signed-off-by: Ivan Kozlovic --- server/client.go | 1 + server/errors.go | 4 ++ server/gateway.go | 14 +++++ server/gateway_test.go | 131 +++++++++++++++++++++++++++++++++++++++++ server/monitor.go | 2 + server/route.go | 23 +++++++- server/server.go | 4 ++ 7 files changed, 178 insertions(+), 1 deletion(-) diff --git a/server/client.go b/server/client.go index 5d372cd4..1c9ab73a 100644 --- a/server/client.go +++ b/server/client.go @@ -201,6 +201,7 @@ const ( ClusterNameConflict DuplicateRemoteLeafnodeConnection DuplicateClientID + DuplicateServerName ) // Some flags passed to processMsgResults diff --git a/server/errors.go b/server/errors.go index 1dcbe8d9..0edb5b9e 100644 --- a/server/errors.go +++ b/server/errors.go @@ -181,6 +181,10 @@ var ( // ErrCertNotPinned is returned when pinned certs are set and the certificate is not in it ErrCertNotPinned = errors.New("certificate not pinned") + + // ErrDuplicateServerName is returned when processing a server remote connection and + // the server reports that this server name is already used in the cluster. + ErrDuplicateServerName = errors.New("duplicate server name") ) // configErr is a configuration error. diff --git a/server/gateway.go b/server/gateway.go index ce5b7e6d..75022d03 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -1013,6 +1013,13 @@ func (c *client) processGatewayInfo(info *Info) { return } + // Check for duplicate server name with servers in our cluster + if s.isDuplicateServerName(info.Name) { + c.Errorf("Remote server has a duplicate name: %q", info.Name) + c.closeConnection(DuplicateServerName) + return + } + // Possibly add URLs that we get from the INFO protocol. if len(info.GatewayURLs) > 0 { cfg.updateURLs(info.GatewayURLs) @@ -1084,6 +1091,13 @@ func (c *client) processGatewayInfo(info *Info) { } else if isFirstINFO { // This is the first INFO of an inbound connection... + // Check for duplicate server name with servers in our cluster + if s.isDuplicateServerName(info.Name) { + c.Errorf("Remote server has a duplicate name: %q", info.Name) + c.closeConnection(DuplicateServerName) + return + } + s.registerInboundGatewayConnection(cid, c) c.Noticef("Inbound gateway connection from %q (%s) registered", info.Gateway, info.ID) diff --git a/server/gateway_test.go b/server/gateway_test.go index 8d7ae9e0..234462c3 100644 --- a/server/gateway_test.go +++ b/server/gateway_test.go @@ -6618,3 +6618,134 @@ func TestGatewayURLsNotRemovedOnDuplicateRoute(t *testing.T) { t.Fatalf("Expected 2 urls to B, got %v", urls) } } + +func TestGatewayDuplicateServerName(t *testing.T) { + // We will have 2 servers per cluster names "nats1" and "nats2", and have + // the servers in the second cluster with the same name, but we will make + // sure to connect "A/nats1" to "B/nats2" and "A/nats2" to "B/nats1" and + // verify that we still discover the duplicate names. + ob1 := testDefaultOptionsForGateway("B") + ob1.ServerName = "nats1" + sb1 := RunServer(ob1) + defer sb1.Shutdown() + + ob2 := testDefaultOptionsForGateway("B") + ob2.ServerName = "nats2" + ob2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", ob1.Cluster.Port)) + sb2 := RunServer(ob2) + defer sb2.Shutdown() + + checkClusterFormed(t, sb1, sb2) + + oa1 := testGatewayOptionsFromToWithServers(t, "A", "B", sb2) + oa1.ServerName = "nats1" + // Needed later in the test + oa1.Gateway.RejectUnknown = true + sa1 := RunServer(oa1) + defer sa1.Shutdown() + sa1l := &captureErrorLogger{errCh: make(chan string, 100)} + sa1.SetLogger(sa1l, false, false) + + oa2 := testGatewayOptionsFromToWithServers(t, "A", "B", sb1) + oa2.ServerName = "nats2" + // Needed later in the test + oa2.Gateway.RejectUnknown = true + oa2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", oa1.Cluster.Port)) + sa2 := RunServer(oa2) + defer sa2.Shutdown() + sa2l := &captureErrorLogger{errCh: make(chan string, 100)} + sa2.SetLogger(sa2l, false, false) + + checkClusterFormed(t, sa1, sa2) + + checkForDupError := func(errCh chan string) { + t.Helper() + timeout := time.NewTimer(time.Second) + for done := false; !done; { + select { + case err := <-errCh: + if strings.Contains(err, "server has a duplicate name") { + done = true + } + case <-timeout.C: + t.Fatal("Did not get error about servers in super-cluster with same name") + } + } + } + + // Since only servers from "A" have configured outbound to + // cluster "B", only servers on "A" are expected to report error. + for _, errCh := range []chan string{sa1l.errCh, sa2l.errCh} { + checkForDupError(errCh) + } + + // So now we are going to fix names and wait for the super cluster to form. + sa2.Shutdown() + sa1.Shutdown() + + // Drain the error channels + for _, errCh := range []chan string{sa1l.errCh, sa2l.errCh} { + for done := false; !done; { + select { + case <-errCh: + default: + done = true + } + } + } + + oa1.ServerName = "a_nats1" + oa2.ServerName = "a_nats2" + sa1 = RunServer(oa1) + defer sa1.Shutdown() + sa2 = RunServer(oa2) + defer sa2.Shutdown() + + checkClusterFormed(t, sa1, sa2) + + waitForOutboundGateways(t, sa1, 1, 2*time.Second) + waitForOutboundGateways(t, sa2, 1, 2*time.Second) + waitForOutboundGateways(t, sb1, 1, 2*time.Second) + waitForOutboundGateways(t, sb2, 1, 2*time.Second) + + // Now add a server on cluster B (that does not have outbound + // gateway connections explicitly defined) and use the name + // of one of the cluster A's server. We should get an error. + ob3 := testDefaultOptionsForGateway("B") + ob3.ServerName = "a_nats2" + ob3.Accounts = []*Account{NewAccount("sys")} + ob3.SystemAccount = "sys" + ob3.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", ob2.Cluster.Port)) + sb3 := RunServer(ob3) + defer sb3.Shutdown() + sb3l := &captureErrorLogger{errCh: make(chan string, 100)} + sb3.SetLogger(sb3l, false, false) + + checkClusterFormed(t, sb1, sb2, sb3) + + // It should report the error when trying to create the GW connection + checkForDupError(sb3l.errCh) + + // Stop this node + sb3.Shutdown() + checkClusterFormed(t, sb1, sb2) + + // Now create a GW "C" with a server that uses the same name than one of + // the server on "A", say "a_nats2". + // This server will connect to "B", and "B" will gossip "A" back to "C" + // and "C" will then try to connect to "A", but "A" rejects unknown, so + // connection will be refused. However, we want to make sure that the + // duplicate server name is still detected. + oc := testGatewayOptionsFromToWithServers(t, "C", "B", sb1) + oc.ServerName = "a_nats2" + oc.Accounts = []*Account{NewAccount("sys")} + oc.SystemAccount = "sys" + sc := RunServer(oc) + defer sc.Shutdown() + scl := &captureErrorLogger{errCh: make(chan string, 100)} + sc.SetLogger(scl, false, false) + + // It should report the error when trying to create the GW connection + // to cluster "A" + checkForDupError(scl.errCh) +} diff --git a/server/monitor.go b/server/monitor.go index 25b7c21c..81eb48f7 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -2102,6 +2102,8 @@ func (reason ClosedState) String() string { return "Duplicate Remote LeafNode Connection" case DuplicateClientID: return "Duplicate Client ID" + case DuplicateServerName: + return "Duplicate Server Name" } return "Unknown State" diff --git a/server/route.go b/server/route.go index 583c9881..927faf3b 100644 --- a/server/route.go +++ b/server/route.go @@ -582,7 +582,7 @@ func (c *client) processRouteInfo(info *Info) { c.mu.Unlock() // This is now an error and we close the connection. We need unique names for JetStream clustering. c.Errorf("Remote server has a duplicate name: %q", info.Name) - c.closeConnection(DuplicateRoute) + c.closeConnection(DuplicateServerName) return } @@ -2027,3 +2027,24 @@ func (s *Server) removeRoute(c *client) { s.removeFromTempClients(cid) s.mu.Unlock() } + +func (s *Server) isDuplicateServerName(name string) bool { + if name == _EMPTY_ { + return false + } + s.mu.Lock() + defer s.mu.Unlock() + + if s.info.Name == name { + return true + } + for _, r := range s.routes { + r.mu.Lock() + duplicate := r.route.remoteName == name + r.mu.Unlock() + if duplicate { + return true + } + } + return false +} diff --git a/server/server.go b/server/server.go index a15301b9..96474680 100644 --- a/server/server.go +++ b/server/server.go @@ -1613,10 +1613,14 @@ func (s *Server) Start() { // Snapshot server options. opts := s.getOpts() + clusterName := s.ClusterName() s.Noticef(" Version: %s", VERSION) s.Noticef(" Git: [%s]", gc) s.Debugf(" Go build: %s", s.info.GoVersion) + if clusterName != _EMPTY_ { + s.Noticef(" Cluster: %s", clusterName) + } s.Noticef(" Name: %s", s.info.Name) if opts.JetStream { s.Noticef(" Node: %s", getHash(s.info.Name))