diff --git a/server/client.go b/server/client.go index 60c5e26c..e1ab4ffc 100644 --- a/server/client.go +++ b/server/client.go @@ -177,6 +177,7 @@ const ( MsgHeaderViolation NoRespondersRequiresHeaders ClusterNameConflict + DuplicateRemoteLeafnodeConnection ) // Some flags passed to processMsgResults diff --git a/server/leafnode.go b/server/leafnode.go index f6b21fd4..72dad43e 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -836,7 +836,7 @@ func (s *Server) createLeafNode(conn net.Conn, remote *leafNodeCfg) *client { if solicited { // Make sure we register with the account here. c.registerWithAccount(acc) - s.addLeafNodeConnection(c) + s.addLeafNodeConnection(c, _EMPTY_, false) s.initLeafNodeSmapAndSendSubs(c) if sendSysConnectEvent { s.sendLeafNodeConnect(acc) @@ -1006,14 +1006,53 @@ func (s *Server) setLeafNodeInfoHostPortAndIP() error { return nil } -func (s *Server) addLeafNodeConnection(c *client) { +// Add the connection to the map of leaf nodes. +// If `checkForDup` is true (invoked when a leafnode is accepted), then we check +// if a connection already exists for the same server name (ID) and account. +// That can happen when the remote is attempting to reconnect while the accepting +// side did not detect the connection as broken yet. +// But it can also happen when there is a misconfiguration and the remote is +// creating two (or more) connections that bind to the same account on the accept +// side. +// When a duplicate is found, the new connection is accepted and the old is closed +// (this solves the stale connection situation). An error is returned to help the +// remote detect the misconfiguration when the duplicate is the result of that +// misconfiguration. +func (s *Server) addLeafNodeConnection(c *client, srvName string, checkForDup bool) { + var accName string c.mu.Lock() cid := c.cid + if c.acc != nil { + accName = c.acc.Name + } c.mu.Unlock() + + var old *client s.mu.Lock() + if checkForDup { + for _, ol := range s.leafs { + ol.mu.Lock() + // We check for empty because in some test we may send empty CONNECT{} + if srvName != _EMPTY_ && ol.opts.Name == srvName && ol.acc.Name == accName { + old = ol + } + ol.mu.Unlock() + if old != nil { + break + } + } + } + // Store new connection in the map s.leafs[cid] = c s.mu.Unlock() s.removeFromTempClients(cid) + + // If applicable, evict the old one. + if old != nil { + old.sendErrAndErr(DuplicateRemoteLeafnodeConnection.String()) + old.closeConnection(DuplicateRemoteLeafnodeConnection) + c.Warnf("Replacing connection from same server") + } } func (s *Server) removeLeafNodeConnection(c *client) { @@ -1073,9 +1112,6 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro return ErrWrongGateway } - // Check for stale connection from same server/account - c.replaceOldLeafNodeConnIfNeeded(s, proto) - // Leaf Nodes do not do echo or verbose or pedantic. c.opts.Verbose = false c.opts.Echo = false @@ -1091,6 +1127,9 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro c.leaf.remoteCluster = proto.Cluster } + // Add in the leafnode here since we passed through auth at this point. + s.addLeafNodeConnection(c, proto.Name, true) + // If we have permissions bound to this leafnode we need to send then back to the // origin server for local enforcement. s.sendPermsInfo(c) @@ -1099,9 +1138,6 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro // This will send all registered subs too. s.initLeafNodeSmapAndSendSubs(c) - // Add in the leafnode here since we passed through auth at this point. - s.addLeafNodeConnection(c) - // Announce the account connect event for a leaf node. // This will no-op as needed. s.sendLeafNodeConnect(c.acc) @@ -1109,42 +1145,6 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro return nil } -// Invoked from a server accepting a leafnode connection. It looks for a possible -// existing leafnode connection from the same server with the same account, and -// if it finds one, closes it so that the new one is accepted and not reported as -// forming a cycle. -// -// This must be invoked for LEAF client types, and on the server accepting the connection. -// -// No server nor client lock held on entry. -func (c *client) replaceOldLeafNodeConnIfNeeded(s *Server, connInfo *leafConnectInfo) { - var accName string - c.mu.Lock() - if c.acc != nil { - accName = c.acc.Name - } - c.mu.Unlock() - - var old *client - s.mu.Lock() - for _, ol := range s.leafs { - ol.mu.Lock() - // We check for empty because in some test we may send empty CONNECT{} - if ol.opts.Name == connInfo.Name && connInfo.Name != _EMPTY_ && ol.acc.Name == accName { - old = ol - } - ol.mu.Unlock() - if old != nil { - break - } - } - s.mu.Unlock() - if old != nil { - old.Warnf("Replacing connection from same server") - old.closeConnection(ReadError) - } -} - // Returns the remote cluster name. This is set only once so does not require a lock. func (c *client) remoteCluster() string { if c.leaf == nil { diff --git a/server/leafnode_test.go b/server/leafnode_test.go index e87b24d0..c4c01b68 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -1803,6 +1803,66 @@ func TestLeafNodeLoopDetectedDueToReconnect(t *testing.T) { checkLeafNodeConnected(t, sl) } +func TestLeafNodeTwoRemotesBindToSameAccount(t *testing.T) { + opts := DefaultOptions() + opts.LeafNode.Host = "127.0.0.1" + opts.LeafNode.Port = -1 + s := RunServer(opts) + defer s.Shutdown() + + conf := ` + listen: 127.0.0.1:-1 + cluster { name: ln22, listen: 127.0.0.1:-1 } + accounts { + a { users [ {user: a, password: a} ]} + b { users [ {user: b, password: b} ]} + } + leafnodes { + remotes = [ + { + url: nats-leaf://127.0.0.1:%d + account: a + } + { + url: nats-leaf://127.0.0.1:%d + account: b + } + ] + } + ` + lconf := createConfFile(t, []byte(fmt.Sprintf(conf, opts.LeafNode.Port, opts.LeafNode.Port))) + defer os.Remove(lconf) + + lopts, err := ProcessConfigFile(lconf) + if err != nil { + t.Fatalf("Error loading config file: %v", err) + } + lopts.NoLog = false + ln, err := NewServer(lopts) + if err != nil { + t.Fatalf("Error creating server: %v", err) + } + defer ln.Shutdown() + l := &captureErrorLogger{errCh: make(chan string, 10)} + ln.SetLogger(l, false, false) + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + ln.Start() + }() + + select { + case err := <-l.errCh: + fmt.Printf("@@IK: err=%q\n", err) + case <-time.After(2 * time.Second): + t.Fatal("Did not get any error") + } + ln.Shutdown() + wg.Wait() +} + func TestLeafNodeNoDuplicateWithinCluster(t *testing.T) { // This set the cluster name to "abc" oSrv1 := DefaultOptions() diff --git a/server/monitor.go b/server/monitor.go index a1e3858b..4bae0fda 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1913,6 +1913,8 @@ func (reason ClosedState) String() string { return "No Responders Requires Headers" case ClusterNameConflict: return "Cluster Name Conflict" + case DuplicateRemoteLeafnodeConnection: + return "Duplicate Remote LeafNode Connection" } return "Unknown State"