mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
[FIXED] LeafNode reject duplicate remote
There was a test to prevent an errorneous loop detection when a remote would reconnect (due to a stale connection) while the accepting side did not detect the bad connection yet. However, this test was racy because the test was done prior to add the connections to the map. In the case of a misconfiguration where the remote creates 2 different remote connections that end-up binding to the same account in the accepting side, then it was possible that this would not be detected. And when it was, the remote side would be unaware since the disconnect/ reconnect attempts would not show up if not running in debug mode. This change makes sure that the detection is no longer racy and returns an error to the remote so at least the log/console of the remote will show the "duplicate connection" error messages. Resolves #1730 Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -177,6 +177,7 @@ const (
|
||||
MsgHeaderViolation
|
||||
NoRespondersRequiresHeaders
|
||||
ClusterNameConflict
|
||||
DuplicateRemoteLeafnodeConnection
|
||||
)
|
||||
|
||||
// Some flags passed to processMsgResults
|
||||
|
||||
@@ -836,7 +836,7 @@ func (s *Server) createLeafNode(conn net.Conn, remote *leafNodeCfg) *client {
|
||||
if solicited {
|
||||
// Make sure we register with the account here.
|
||||
c.registerWithAccount(acc)
|
||||
s.addLeafNodeConnection(c)
|
||||
s.addLeafNodeConnection(c, _EMPTY_, false)
|
||||
s.initLeafNodeSmapAndSendSubs(c)
|
||||
if sendSysConnectEvent {
|
||||
s.sendLeafNodeConnect(acc)
|
||||
@@ -1006,14 +1006,53 @@ func (s *Server) setLeafNodeInfoHostPortAndIP() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) addLeafNodeConnection(c *client) {
|
||||
// Add the connection to the map of leaf nodes.
|
||||
// If `checkForDup` is true (invoked when a leafnode is accepted), then we check
|
||||
// if a connection already exists for the same server name (ID) and account.
|
||||
// That can happen when the remote is attempting to reconnect while the accepting
|
||||
// side did not detect the connection as broken yet.
|
||||
// But it can also happen when there is a misconfiguration and the remote is
|
||||
// creating two (or more) connections that bind to the same account on the accept
|
||||
// side.
|
||||
// When a duplicate is found, the new connection is accepted and the old is closed
|
||||
// (this solves the stale connection situation). An error is returned to help the
|
||||
// remote detect the misconfiguration when the duplicate is the result of that
|
||||
// misconfiguration.
|
||||
func (s *Server) addLeafNodeConnection(c *client, srvName string, checkForDup bool) {
|
||||
var accName string
|
||||
c.mu.Lock()
|
||||
cid := c.cid
|
||||
if c.acc != nil {
|
||||
accName = c.acc.Name
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
var old *client
|
||||
s.mu.Lock()
|
||||
if checkForDup {
|
||||
for _, ol := range s.leafs {
|
||||
ol.mu.Lock()
|
||||
// We check for empty because in some test we may send empty CONNECT{}
|
||||
if srvName != _EMPTY_ && ol.opts.Name == srvName && ol.acc.Name == accName {
|
||||
old = ol
|
||||
}
|
||||
ol.mu.Unlock()
|
||||
if old != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
// Store new connection in the map
|
||||
s.leafs[cid] = c
|
||||
s.mu.Unlock()
|
||||
s.removeFromTempClients(cid)
|
||||
|
||||
// If applicable, evict the old one.
|
||||
if old != nil {
|
||||
old.sendErrAndErr(DuplicateRemoteLeafnodeConnection.String())
|
||||
old.closeConnection(DuplicateRemoteLeafnodeConnection)
|
||||
c.Warnf("Replacing connection from same server")
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) removeLeafNodeConnection(c *client) {
|
||||
@@ -1073,9 +1112,6 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro
|
||||
return ErrWrongGateway
|
||||
}
|
||||
|
||||
// Check for stale connection from same server/account
|
||||
c.replaceOldLeafNodeConnIfNeeded(s, proto)
|
||||
|
||||
// Leaf Nodes do not do echo or verbose or pedantic.
|
||||
c.opts.Verbose = false
|
||||
c.opts.Echo = false
|
||||
@@ -1091,6 +1127,9 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro
|
||||
c.leaf.remoteCluster = proto.Cluster
|
||||
}
|
||||
|
||||
// Add in the leafnode here since we passed through auth at this point.
|
||||
s.addLeafNodeConnection(c, proto.Name, true)
|
||||
|
||||
// If we have permissions bound to this leafnode we need to send then back to the
|
||||
// origin server for local enforcement.
|
||||
s.sendPermsInfo(c)
|
||||
@@ -1099,9 +1138,6 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro
|
||||
// This will send all registered subs too.
|
||||
s.initLeafNodeSmapAndSendSubs(c)
|
||||
|
||||
// Add in the leafnode here since we passed through auth at this point.
|
||||
s.addLeafNodeConnection(c)
|
||||
|
||||
// Announce the account connect event for a leaf node.
|
||||
// This will no-op as needed.
|
||||
s.sendLeafNodeConnect(c.acc)
|
||||
@@ -1109,42 +1145,6 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro
|
||||
return nil
|
||||
}
|
||||
|
||||
// Invoked from a server accepting a leafnode connection. It looks for a possible
|
||||
// existing leafnode connection from the same server with the same account, and
|
||||
// if it finds one, closes it so that the new one is accepted and not reported as
|
||||
// forming a cycle.
|
||||
//
|
||||
// This must be invoked for LEAF client types, and on the server accepting the connection.
|
||||
//
|
||||
// No server nor client lock held on entry.
|
||||
func (c *client) replaceOldLeafNodeConnIfNeeded(s *Server, connInfo *leafConnectInfo) {
|
||||
var accName string
|
||||
c.mu.Lock()
|
||||
if c.acc != nil {
|
||||
accName = c.acc.Name
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
var old *client
|
||||
s.mu.Lock()
|
||||
for _, ol := range s.leafs {
|
||||
ol.mu.Lock()
|
||||
// We check for empty because in some test we may send empty CONNECT{}
|
||||
if ol.opts.Name == connInfo.Name && connInfo.Name != _EMPTY_ && ol.acc.Name == accName {
|
||||
old = ol
|
||||
}
|
||||
ol.mu.Unlock()
|
||||
if old != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
s.mu.Unlock()
|
||||
if old != nil {
|
||||
old.Warnf("Replacing connection from same server")
|
||||
old.closeConnection(ReadError)
|
||||
}
|
||||
}
|
||||
|
||||
// Returns the remote cluster name. This is set only once so does not require a lock.
|
||||
func (c *client) remoteCluster() string {
|
||||
if c.leaf == nil {
|
||||
|
||||
@@ -1803,6 +1803,66 @@ func TestLeafNodeLoopDetectedDueToReconnect(t *testing.T) {
|
||||
checkLeafNodeConnected(t, sl)
|
||||
}
|
||||
|
||||
func TestLeafNodeTwoRemotesBindToSameAccount(t *testing.T) {
|
||||
opts := DefaultOptions()
|
||||
opts.LeafNode.Host = "127.0.0.1"
|
||||
opts.LeafNode.Port = -1
|
||||
s := RunServer(opts)
|
||||
defer s.Shutdown()
|
||||
|
||||
conf := `
|
||||
listen: 127.0.0.1:-1
|
||||
cluster { name: ln22, listen: 127.0.0.1:-1 }
|
||||
accounts {
|
||||
a { users [ {user: a, password: a} ]}
|
||||
b { users [ {user: b, password: b} ]}
|
||||
}
|
||||
leafnodes {
|
||||
remotes = [
|
||||
{
|
||||
url: nats-leaf://127.0.0.1:%d
|
||||
account: a
|
||||
}
|
||||
{
|
||||
url: nats-leaf://127.0.0.1:%d
|
||||
account: b
|
||||
}
|
||||
]
|
||||
}
|
||||
`
|
||||
lconf := createConfFile(t, []byte(fmt.Sprintf(conf, opts.LeafNode.Port, opts.LeafNode.Port)))
|
||||
defer os.Remove(lconf)
|
||||
|
||||
lopts, err := ProcessConfigFile(lconf)
|
||||
if err != nil {
|
||||
t.Fatalf("Error loading config file: %v", err)
|
||||
}
|
||||
lopts.NoLog = false
|
||||
ln, err := NewServer(lopts)
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating server: %v", err)
|
||||
}
|
||||
defer ln.Shutdown()
|
||||
l := &captureErrorLogger{errCh: make(chan string, 10)}
|
||||
ln.SetLogger(l, false, false)
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
ln.Start()
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-l.errCh:
|
||||
fmt.Printf("@@IK: err=%q\n", err)
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Did not get any error")
|
||||
}
|
||||
ln.Shutdown()
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestLeafNodeNoDuplicateWithinCluster(t *testing.T) {
|
||||
// This set the cluster name to "abc"
|
||||
oSrv1 := DefaultOptions()
|
||||
|
||||
@@ -1913,6 +1913,8 @@ func (reason ClosedState) String() string {
|
||||
return "No Responders Requires Headers"
|
||||
case ClusterNameConflict:
|
||||
return "Cluster Name Conflict"
|
||||
case DuplicateRemoteLeafnodeConnection:
|
||||
return "Duplicate Remote LeafNode Connection"
|
||||
}
|
||||
|
||||
return "Unknown State"
|
||||
|
||||
Reference in New Issue
Block a user