mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
[CHANGED] Gateway: Detect duplicate names between clusters
Gateway connection will be closed and error reported if a remote has a name that is a duplicate of the local cluster. Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -201,6 +201,7 @@ const (
|
||||
ClusterNameConflict
|
||||
DuplicateRemoteLeafnodeConnection
|
||||
DuplicateClientID
|
||||
DuplicateServerName
|
||||
)
|
||||
|
||||
// Some flags passed to processMsgResults
|
||||
|
||||
@@ -181,6 +181,10 @@ var (
|
||||
|
||||
// ErrCertNotPinned is returned when pinned certs are set and the certificate is not in it
|
||||
ErrCertNotPinned = errors.New("certificate not pinned")
|
||||
|
||||
// ErrDuplicateServerName is returned when processing a server remote connection and
|
||||
// the server reports that this server name is already used in the cluster.
|
||||
ErrDuplicateServerName = errors.New("duplicate server name")
|
||||
)
|
||||
|
||||
// configErr is a configuration error.
|
||||
|
||||
@@ -1013,6 +1013,13 @@ func (c *client) processGatewayInfo(info *Info) {
|
||||
return
|
||||
}
|
||||
|
||||
// Check for duplicate server name with servers in our cluster
|
||||
if s.isDuplicateServerName(info.Name) {
|
||||
c.Errorf("Remote server has a duplicate name: %q", info.Name)
|
||||
c.closeConnection(DuplicateServerName)
|
||||
return
|
||||
}
|
||||
|
||||
// Possibly add URLs that we get from the INFO protocol.
|
||||
if len(info.GatewayURLs) > 0 {
|
||||
cfg.updateURLs(info.GatewayURLs)
|
||||
@@ -1084,6 +1091,13 @@ func (c *client) processGatewayInfo(info *Info) {
|
||||
} else if isFirstINFO {
|
||||
// This is the first INFO of an inbound connection...
|
||||
|
||||
// Check for duplicate server name with servers in our cluster
|
||||
if s.isDuplicateServerName(info.Name) {
|
||||
c.Errorf("Remote server has a duplicate name: %q", info.Name)
|
||||
c.closeConnection(DuplicateServerName)
|
||||
return
|
||||
}
|
||||
|
||||
s.registerInboundGatewayConnection(cid, c)
|
||||
c.Noticef("Inbound gateway connection from %q (%s) registered", info.Gateway, info.ID)
|
||||
|
||||
|
||||
@@ -6618,3 +6618,134 @@ func TestGatewayURLsNotRemovedOnDuplicateRoute(t *testing.T) {
|
||||
t.Fatalf("Expected 2 urls to B, got %v", urls)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGatewayDuplicateServerName(t *testing.T) {
|
||||
// We will have 2 servers per cluster names "nats1" and "nats2", and have
|
||||
// the servers in the second cluster with the same name, but we will make
|
||||
// sure to connect "A/nats1" to "B/nats2" and "A/nats2" to "B/nats1" and
|
||||
// verify that we still discover the duplicate names.
|
||||
ob1 := testDefaultOptionsForGateway("B")
|
||||
ob1.ServerName = "nats1"
|
||||
sb1 := RunServer(ob1)
|
||||
defer sb1.Shutdown()
|
||||
|
||||
ob2 := testDefaultOptionsForGateway("B")
|
||||
ob2.ServerName = "nats2"
|
||||
ob2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", ob1.Cluster.Port))
|
||||
sb2 := RunServer(ob2)
|
||||
defer sb2.Shutdown()
|
||||
|
||||
checkClusterFormed(t, sb1, sb2)
|
||||
|
||||
oa1 := testGatewayOptionsFromToWithServers(t, "A", "B", sb2)
|
||||
oa1.ServerName = "nats1"
|
||||
// Needed later in the test
|
||||
oa1.Gateway.RejectUnknown = true
|
||||
sa1 := RunServer(oa1)
|
||||
defer sa1.Shutdown()
|
||||
sa1l := &captureErrorLogger{errCh: make(chan string, 100)}
|
||||
sa1.SetLogger(sa1l, false, false)
|
||||
|
||||
oa2 := testGatewayOptionsFromToWithServers(t, "A", "B", sb1)
|
||||
oa2.ServerName = "nats2"
|
||||
// Needed later in the test
|
||||
oa2.Gateway.RejectUnknown = true
|
||||
oa2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", oa1.Cluster.Port))
|
||||
sa2 := RunServer(oa2)
|
||||
defer sa2.Shutdown()
|
||||
sa2l := &captureErrorLogger{errCh: make(chan string, 100)}
|
||||
sa2.SetLogger(sa2l, false, false)
|
||||
|
||||
checkClusterFormed(t, sa1, sa2)
|
||||
|
||||
checkForDupError := func(errCh chan string) {
|
||||
t.Helper()
|
||||
timeout := time.NewTimer(time.Second)
|
||||
for done := false; !done; {
|
||||
select {
|
||||
case err := <-errCh:
|
||||
if strings.Contains(err, "server has a duplicate name") {
|
||||
done = true
|
||||
}
|
||||
case <-timeout.C:
|
||||
t.Fatal("Did not get error about servers in super-cluster with same name")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Since only servers from "A" have configured outbound to
|
||||
// cluster "B", only servers on "A" are expected to report error.
|
||||
for _, errCh := range []chan string{sa1l.errCh, sa2l.errCh} {
|
||||
checkForDupError(errCh)
|
||||
}
|
||||
|
||||
// So now we are going to fix names and wait for the super cluster to form.
|
||||
sa2.Shutdown()
|
||||
sa1.Shutdown()
|
||||
|
||||
// Drain the error channels
|
||||
for _, errCh := range []chan string{sa1l.errCh, sa2l.errCh} {
|
||||
for done := false; !done; {
|
||||
select {
|
||||
case <-errCh:
|
||||
default:
|
||||
done = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
oa1.ServerName = "a_nats1"
|
||||
oa2.ServerName = "a_nats2"
|
||||
sa1 = RunServer(oa1)
|
||||
defer sa1.Shutdown()
|
||||
sa2 = RunServer(oa2)
|
||||
defer sa2.Shutdown()
|
||||
|
||||
checkClusterFormed(t, sa1, sa2)
|
||||
|
||||
waitForOutboundGateways(t, sa1, 1, 2*time.Second)
|
||||
waitForOutboundGateways(t, sa2, 1, 2*time.Second)
|
||||
waitForOutboundGateways(t, sb1, 1, 2*time.Second)
|
||||
waitForOutboundGateways(t, sb2, 1, 2*time.Second)
|
||||
|
||||
// Now add a server on cluster B (that does not have outbound
|
||||
// gateway connections explicitly defined) and use the name
|
||||
// of one of the cluster A's server. We should get an error.
|
||||
ob3 := testDefaultOptionsForGateway("B")
|
||||
ob3.ServerName = "a_nats2"
|
||||
ob3.Accounts = []*Account{NewAccount("sys")}
|
||||
ob3.SystemAccount = "sys"
|
||||
ob3.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", ob2.Cluster.Port))
|
||||
sb3 := RunServer(ob3)
|
||||
defer sb3.Shutdown()
|
||||
sb3l := &captureErrorLogger{errCh: make(chan string, 100)}
|
||||
sb3.SetLogger(sb3l, false, false)
|
||||
|
||||
checkClusterFormed(t, sb1, sb2, sb3)
|
||||
|
||||
// It should report the error when trying to create the GW connection
|
||||
checkForDupError(sb3l.errCh)
|
||||
|
||||
// Stop this node
|
||||
sb3.Shutdown()
|
||||
checkClusterFormed(t, sb1, sb2)
|
||||
|
||||
// Now create a GW "C" with a server that uses the same name than one of
|
||||
// the server on "A", say "a_nats2".
|
||||
// This server will connect to "B", and "B" will gossip "A" back to "C"
|
||||
// and "C" will then try to connect to "A", but "A" rejects unknown, so
|
||||
// connection will be refused. However, we want to make sure that the
|
||||
// duplicate server name is still detected.
|
||||
oc := testGatewayOptionsFromToWithServers(t, "C", "B", sb1)
|
||||
oc.ServerName = "a_nats2"
|
||||
oc.Accounts = []*Account{NewAccount("sys")}
|
||||
oc.SystemAccount = "sys"
|
||||
sc := RunServer(oc)
|
||||
defer sc.Shutdown()
|
||||
scl := &captureErrorLogger{errCh: make(chan string, 100)}
|
||||
sc.SetLogger(scl, false, false)
|
||||
|
||||
// It should report the error when trying to create the GW connection
|
||||
// to cluster "A"
|
||||
checkForDupError(scl.errCh)
|
||||
}
|
||||
|
||||
@@ -2102,6 +2102,8 @@ func (reason ClosedState) String() string {
|
||||
return "Duplicate Remote LeafNode Connection"
|
||||
case DuplicateClientID:
|
||||
return "Duplicate Client ID"
|
||||
case DuplicateServerName:
|
||||
return "Duplicate Server Name"
|
||||
}
|
||||
|
||||
return "Unknown State"
|
||||
|
||||
@@ -582,7 +582,7 @@ func (c *client) processRouteInfo(info *Info) {
|
||||
c.mu.Unlock()
|
||||
// This is now an error and we close the connection. We need unique names for JetStream clustering.
|
||||
c.Errorf("Remote server has a duplicate name: %q", info.Name)
|
||||
c.closeConnection(DuplicateRoute)
|
||||
c.closeConnection(DuplicateServerName)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -2027,3 +2027,24 @@ func (s *Server) removeRoute(c *client) {
|
||||
s.removeFromTempClients(cid)
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (s *Server) isDuplicateServerName(name string) bool {
|
||||
if name == _EMPTY_ {
|
||||
return false
|
||||
}
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if s.info.Name == name {
|
||||
return true
|
||||
}
|
||||
for _, r := range s.routes {
|
||||
r.mu.Lock()
|
||||
duplicate := r.route.remoteName == name
|
||||
r.mu.Unlock()
|
||||
if duplicate {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -1613,10 +1613,14 @@ func (s *Server) Start() {
|
||||
|
||||
// Snapshot server options.
|
||||
opts := s.getOpts()
|
||||
clusterName := s.ClusterName()
|
||||
|
||||
s.Noticef(" Version: %s", VERSION)
|
||||
s.Noticef(" Git: [%s]", gc)
|
||||
s.Debugf(" Go build: %s", s.info.GoVersion)
|
||||
if clusterName != _EMPTY_ {
|
||||
s.Noticef(" Cluster: %s", clusterName)
|
||||
}
|
||||
s.Noticef(" Name: %s", s.info.Name)
|
||||
if opts.JetStream {
|
||||
s.Noticef(" Node: %s", getHash(s.info.Name))
|
||||
|
||||
Reference in New Issue
Block a user