From 2605ae71eddbd8c51f444c7cd761e88a9970539b Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Tue, 22 Sep 2020 16:58:36 -0600 Subject: [PATCH] [FIXED] Prevent LeafNode loop detection on early reconnect If the soliciting side detects the disconnect and attempts to reconnect but the accepting side did not yet close the connection, a "loop detected" error would be reported and the soliciting server would not try to reconnect for 30 seconds. Made a change so that the accepting server checks for existing leafnode connection for the same server and same account, and if it is found, close the "old" connection so it is replaced by the "new" one. Resolves #1606 Signed-off-by: Ivan Kozlovic --- server/leafnode.go | 39 +++++++++++ server/leafnode_test.go | 152 ++++++++++++++++++++++++++++++++++++++++ test/leafnode_test.go | 2 +- 3 files changed, 192 insertions(+), 1 deletion(-) diff --git a/server/leafnode.go b/server/leafnode.go index 358d07c3..f7d7cd84 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -1035,6 +1035,9 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro return ErrWrongGateway } + // Check for stale connection from same server/account + c.replaceOldLeafNodeConnIfNeeded(s, proto) + // Leaf Nodes do not do echo or verbose or pedantic. c.opts.Verbose = false c.opts.Echo = false @@ -1068,6 +1071,42 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro return nil } +// Invoked from a server accepting a leafnode connection. It looks for a possible +// existing leafnode connection from the same server with the same account, and +// if it finds one, closes it so that the new one is accepted and not reported as +// forming a cycle. +// +// This must be invoked for LEAF client types, and on the server accepting the connection. +// +// No server nor client lock held on entry. +func (c *client) replaceOldLeafNodeConnIfNeeded(s *Server, connInfo *leafConnectInfo) { + var accName string + c.mu.Lock() + if c.acc != nil { + accName = c.acc.Name + } + c.mu.Unlock() + + var old *client + s.mu.Lock() + for _, ol := range s.leafs { + ol.mu.Lock() + // We check for empty because in some test we may send empty CONNECT{} + if ol.opts.Name == connInfo.Name && connInfo.Name != _EMPTY_ && ol.acc.Name == accName { + old = ol + } + ol.mu.Unlock() + if old != nil { + break + } + } + s.mu.Unlock() + if old != nil { + old.Warnf("Replacing connection from same server") + old.closeConnection(ReadError) + } +} + // Returns the remote cluster name. This is set only once so does not require a lock. func (c *client) remoteCluster() string { if c.leaf == nil { diff --git a/server/leafnode_test.go b/server/leafnode_test.go index 0555908b..d9c0f626 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -22,6 +22,7 @@ import ( "net/url" "os" "strings" + "sync" "sync/atomic" "testing" "time" @@ -1650,3 +1651,154 @@ func TestLeafNodeOriginClusterInfo(t *testing.T) { t.Fatalf("Expected a different id, got the same") } } + +type proxyAcceptDetectFailureLate struct { + sync.Mutex + wg sync.WaitGroup + acceptPort int + l net.Listener + srvs []net.Conn + leaf net.Conn +} + +func (p *proxyAcceptDetectFailureLate) run(t *testing.T) int { + l, err := natsListen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("Error on listen: %v", err) + } + p.Lock() + p.l = l + p.Unlock() + port := l.Addr().(*net.TCPAddr).Port + p.wg.Add(1) + go func() { + defer p.wg.Done() + defer l.Close() + defer func() { + p.Lock() + for _, c := range p.srvs { + c.Close() + } + p.Unlock() + }() + for { + c, err := l.Accept() + if err != nil { + return + } + srv, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", p.acceptPort)) + if err != nil { + return + } + p.Lock() + p.leaf = c + p.srvs = append(p.srvs, srv) + p.Unlock() + + transfer := func(c1, c2 net.Conn) { + var buf [1024]byte + for { + n, err := c1.Read(buf[:]) + if err != nil { + return + } + if _, err := c2.Write(buf[:n]); err != nil { + return + } + } + } + + go transfer(srv, c) + go transfer(c, srv) + } + }() + return port +} + +func (p *proxyAcceptDetectFailureLate) close() { + p.Lock() + p.l.Close() + p.Unlock() + + p.wg.Wait() +} + +type oldConnReplacedLogger struct { + DummyLogger + errCh chan string + warnCh chan string +} + +func (l *oldConnReplacedLogger) Errorf(format string, v ...interface{}) { + select { + case l.errCh <- fmt.Sprintf(format, v...): + default: + } +} + +func (l *oldConnReplacedLogger) Warnf(format string, v ...interface{}) { + select { + case l.warnCh <- fmt.Sprintf(format, v...): + default: + } +} + +// This test will simulate that the accept side does not detect the connection +// has been closed early enough. The soliciting side will attempt to reconnect +// and we should not be getting the "loop detected" error. +func TestLeafNodeLoopDetectedDueToReconnect(t *testing.T) { + o := DefaultOptions() + o.LeafNode.Host = "127.0.0.1" + o.LeafNode.Port = -1 + s := RunServer(o) + defer s.Shutdown() + + l := &oldConnReplacedLogger{errCh: make(chan string, 10), warnCh: make(chan string, 10)} + s.SetLogger(l, false, false) + + p := &proxyAcceptDetectFailureLate{acceptPort: o.LeafNode.Port} + defer p.close() + port := p.run(t) + + aurl, err := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", port)) + if err != nil { + t.Fatalf("Error parsing url: %v", err) + } + ol := DefaultOptions() + ol.Cluster.Name = "cde" + ol.LeafNode.ReconnectInterval = 50 * time.Millisecond + ol.LeafNode.Remotes = []*RemoteLeafOpts{&RemoteLeafOpts{URLs: []*url.URL{aurl}}} + sl := RunServer(ol) + defer sl.Shutdown() + + checkLeafNodeConnected(t, s) + checkLeafNodeConnected(t, sl) + + // Cause disconnect client side... + p.Lock() + p.leaf.Close() + p.Unlock() + + // Make sure we did not get the loop detected error + select { + case e := <-l.errCh: + if strings.Contains(e, "Loop detected") { + t.Fatalf("Loop detected: %v", e) + } + case <-time.After(250 * time.Millisecond): + // We are ok + } + + // Now make sure we got the warning + select { + case w := <-l.warnCh: + if !strings.Contains(w, "Replacing connection from same server") { + t.Fatalf("Unexpected warning: %v", w) + } + case <-time.After(time.Second): + t.Fatal("Did not get expected warning") + } + + checkLeafNodeConnected(t, s) + checkLeafNodeConnected(t, sl) +} diff --git a/test/leafnode_test.go b/test/leafnode_test.go index 42314bf1..eb142437 100644 --- a/test/leafnode_test.go +++ b/test/leafnode_test.go @@ -1722,7 +1722,7 @@ func TestLeafNodeExportsImports(t *testing.T) { } } -func TestLeadNodeExportImportComplexSetup(t *testing.T) { +func TestLeafNodeExportImportComplexSetup(t *testing.T) { content := ` port: -1 operator = "./configs/nkeys/op.jwt"