diff --git a/server/leafnode.go b/server/leafnode.go index 358d07c3..f7d7cd84 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -1035,6 +1035,9 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro return ErrWrongGateway } + // Check for stale connection from same server/account + c.replaceOldLeafNodeConnIfNeeded(s, proto) + // Leaf Nodes do not do echo or verbose or pedantic. c.opts.Verbose = false c.opts.Echo = false @@ -1068,6 +1071,42 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro return nil } +// Invoked from a server accepting a leafnode connection. It looks for a possible +// existing leafnode connection from the same server with the same account, and +// if it finds one, closes it so that the new one is accepted and not reported as +// forming a cycle. +// +// This must be invoked for LEAF client types, and on the server accepting the connection. +// +// No server nor client lock held on entry. +func (c *client) replaceOldLeafNodeConnIfNeeded(s *Server, connInfo *leafConnectInfo) { + var accName string + c.mu.Lock() + if c.acc != nil { + accName = c.acc.Name + } + c.mu.Unlock() + + var old *client + s.mu.Lock() + for _, ol := range s.leafs { + ol.mu.Lock() + // We check for empty because in some test we may send empty CONNECT{} + if ol.opts.Name == connInfo.Name && connInfo.Name != _EMPTY_ && ol.acc.Name == accName { + old = ol + } + ol.mu.Unlock() + if old != nil { + break + } + } + s.mu.Unlock() + if old != nil { + old.Warnf("Replacing connection from same server") + old.closeConnection(ReadError) + } +} + // Returns the remote cluster name. This is set only once so does not require a lock. func (c *client) remoteCluster() string { if c.leaf == nil { diff --git a/server/leafnode_test.go b/server/leafnode_test.go index 0555908b..d9c0f626 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -22,6 +22,7 @@ import ( "net/url" "os" "strings" + "sync" "sync/atomic" "testing" "time" @@ -1650,3 +1651,154 @@ func TestLeafNodeOriginClusterInfo(t *testing.T) { t.Fatalf("Expected a different id, got the same") } } + +type proxyAcceptDetectFailureLate struct { + sync.Mutex + wg sync.WaitGroup + acceptPort int + l net.Listener + srvs []net.Conn + leaf net.Conn +} + +func (p *proxyAcceptDetectFailureLate) run(t *testing.T) int { + l, err := natsListen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("Error on listen: %v", err) + } + p.Lock() + p.l = l + p.Unlock() + port := l.Addr().(*net.TCPAddr).Port + p.wg.Add(1) + go func() { + defer p.wg.Done() + defer l.Close() + defer func() { + p.Lock() + for _, c := range p.srvs { + c.Close() + } + p.Unlock() + }() + for { + c, err := l.Accept() + if err != nil { + return + } + srv, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", p.acceptPort)) + if err != nil { + return + } + p.Lock() + p.leaf = c + p.srvs = append(p.srvs, srv) + p.Unlock() + + transfer := func(c1, c2 net.Conn) { + var buf [1024]byte + for { + n, err := c1.Read(buf[:]) + if err != nil { + return + } + if _, err := c2.Write(buf[:n]); err != nil { + return + } + } + } + + go transfer(srv, c) + go transfer(c, srv) + } + }() + return port +} + +func (p *proxyAcceptDetectFailureLate) close() { + p.Lock() + p.l.Close() + p.Unlock() + + p.wg.Wait() +} + +type oldConnReplacedLogger struct { + DummyLogger + errCh chan string + warnCh chan string +} + +func (l *oldConnReplacedLogger) Errorf(format string, v ...interface{}) { + select { + case l.errCh <- fmt.Sprintf(format, v...): + default: + } +} + +func (l *oldConnReplacedLogger) Warnf(format string, v ...interface{}) { + select { + case l.warnCh <- fmt.Sprintf(format, v...): + default: + } +} + +// This test will simulate that the accept side does not detect the connection +// has been closed early enough. The soliciting side will attempt to reconnect +// and we should not be getting the "loop detected" error. +func TestLeafNodeLoopDetectedDueToReconnect(t *testing.T) { + o := DefaultOptions() + o.LeafNode.Host = "127.0.0.1" + o.LeafNode.Port = -1 + s := RunServer(o) + defer s.Shutdown() + + l := &oldConnReplacedLogger{errCh: make(chan string, 10), warnCh: make(chan string, 10)} + s.SetLogger(l, false, false) + + p := &proxyAcceptDetectFailureLate{acceptPort: o.LeafNode.Port} + defer p.close() + port := p.run(t) + + aurl, err := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", port)) + if err != nil { + t.Fatalf("Error parsing url: %v", err) + } + ol := DefaultOptions() + ol.Cluster.Name = "cde" + ol.LeafNode.ReconnectInterval = 50 * time.Millisecond + ol.LeafNode.Remotes = []*RemoteLeafOpts{&RemoteLeafOpts{URLs: []*url.URL{aurl}}} + sl := RunServer(ol) + defer sl.Shutdown() + + checkLeafNodeConnected(t, s) + checkLeafNodeConnected(t, sl) + + // Cause disconnect client side... + p.Lock() + p.leaf.Close() + p.Unlock() + + // Make sure we did not get the loop detected error + select { + case e := <-l.errCh: + if strings.Contains(e, "Loop detected") { + t.Fatalf("Loop detected: %v", e) + } + case <-time.After(250 * time.Millisecond): + // We are ok + } + + // Now make sure we got the warning + select { + case w := <-l.warnCh: + if !strings.Contains(w, "Replacing connection from same server") { + t.Fatalf("Unexpected warning: %v", w) + } + case <-time.After(time.Second): + t.Fatal("Did not get expected warning") + } + + checkLeafNodeConnected(t, s) + checkLeafNodeConnected(t, sl) +} diff --git a/test/leafnode_test.go b/test/leafnode_test.go index 42314bf1..eb142437 100644 --- a/test/leafnode_test.go +++ b/test/leafnode_test.go @@ -1722,7 +1722,7 @@ func TestLeafNodeExportsImports(t *testing.T) { } } -func TestLeadNodeExportImportComplexSetup(t *testing.T) { +func TestLeafNodeExportImportComplexSetup(t *testing.T) { content := ` port: -1 operator = "./configs/nkeys/op.jwt"