mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Merge pull request #1607 from nats-io/fix_leafnode_loop_detected
[FIXED] Prevent LeafNode loop detection on early reconnect
This commit is contained in:
@@ -1035,6 +1035,9 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro
|
||||
return ErrWrongGateway
|
||||
}
|
||||
|
||||
// Check for stale connection from same server/account
|
||||
c.replaceOldLeafNodeConnIfNeeded(s, proto)
|
||||
|
||||
// Leaf Nodes do not do echo or verbose or pedantic.
|
||||
c.opts.Verbose = false
|
||||
c.opts.Echo = false
|
||||
@@ -1068,6 +1071,42 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro
|
||||
return nil
|
||||
}
|
||||
|
||||
// Invoked from a server accepting a leafnode connection. It looks for a possible
|
||||
// existing leafnode connection from the same server with the same account, and
|
||||
// if it finds one, closes it so that the new one is accepted and not reported as
|
||||
// forming a cycle.
|
||||
//
|
||||
// This must be invoked for LEAF client types, and on the server accepting the connection.
|
||||
//
|
||||
// No server nor client lock held on entry.
|
||||
func (c *client) replaceOldLeafNodeConnIfNeeded(s *Server, connInfo *leafConnectInfo) {
|
||||
var accName string
|
||||
c.mu.Lock()
|
||||
if c.acc != nil {
|
||||
accName = c.acc.Name
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
var old *client
|
||||
s.mu.Lock()
|
||||
for _, ol := range s.leafs {
|
||||
ol.mu.Lock()
|
||||
// We check for empty because in some test we may send empty CONNECT{}
|
||||
if ol.opts.Name == connInfo.Name && connInfo.Name != _EMPTY_ && ol.acc.Name == accName {
|
||||
old = ol
|
||||
}
|
||||
ol.mu.Unlock()
|
||||
if old != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
s.mu.Unlock()
|
||||
if old != nil {
|
||||
old.Warnf("Replacing connection from same server")
|
||||
old.closeConnection(ReadError)
|
||||
}
|
||||
}
|
||||
|
||||
// Returns the remote cluster name. This is set only once so does not require a lock.
|
||||
func (c *client) remoteCluster() string {
|
||||
if c.leaf == nil {
|
||||
|
||||
@@ -22,6 +22,7 @@ import (
|
||||
"net/url"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -1650,3 +1651,154 @@ func TestLeafNodeOriginClusterInfo(t *testing.T) {
|
||||
t.Fatalf("Expected a different id, got the same")
|
||||
}
|
||||
}
|
||||
|
||||
type proxyAcceptDetectFailureLate struct {
|
||||
sync.Mutex
|
||||
wg sync.WaitGroup
|
||||
acceptPort int
|
||||
l net.Listener
|
||||
srvs []net.Conn
|
||||
leaf net.Conn
|
||||
}
|
||||
|
||||
func (p *proxyAcceptDetectFailureLate) run(t *testing.T) int {
|
||||
l, err := natsListen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatalf("Error on listen: %v", err)
|
||||
}
|
||||
p.Lock()
|
||||
p.l = l
|
||||
p.Unlock()
|
||||
port := l.Addr().(*net.TCPAddr).Port
|
||||
p.wg.Add(1)
|
||||
go func() {
|
||||
defer p.wg.Done()
|
||||
defer l.Close()
|
||||
defer func() {
|
||||
p.Lock()
|
||||
for _, c := range p.srvs {
|
||||
c.Close()
|
||||
}
|
||||
p.Unlock()
|
||||
}()
|
||||
for {
|
||||
c, err := l.Accept()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
srv, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", p.acceptPort))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
p.Lock()
|
||||
p.leaf = c
|
||||
p.srvs = append(p.srvs, srv)
|
||||
p.Unlock()
|
||||
|
||||
transfer := func(c1, c2 net.Conn) {
|
||||
var buf [1024]byte
|
||||
for {
|
||||
n, err := c1.Read(buf[:])
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if _, err := c2.Write(buf[:n]); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
go transfer(srv, c)
|
||||
go transfer(c, srv)
|
||||
}
|
||||
}()
|
||||
return port
|
||||
}
|
||||
|
||||
func (p *proxyAcceptDetectFailureLate) close() {
|
||||
p.Lock()
|
||||
p.l.Close()
|
||||
p.Unlock()
|
||||
|
||||
p.wg.Wait()
|
||||
}
|
||||
|
||||
type oldConnReplacedLogger struct {
|
||||
DummyLogger
|
||||
errCh chan string
|
||||
warnCh chan string
|
||||
}
|
||||
|
||||
func (l *oldConnReplacedLogger) Errorf(format string, v ...interface{}) {
|
||||
select {
|
||||
case l.errCh <- fmt.Sprintf(format, v...):
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (l *oldConnReplacedLogger) Warnf(format string, v ...interface{}) {
|
||||
select {
|
||||
case l.warnCh <- fmt.Sprintf(format, v...):
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// This test will simulate that the accept side does not detect the connection
|
||||
// has been closed early enough. The soliciting side will attempt to reconnect
|
||||
// and we should not be getting the "loop detected" error.
|
||||
func TestLeafNodeLoopDetectedDueToReconnect(t *testing.T) {
|
||||
o := DefaultOptions()
|
||||
o.LeafNode.Host = "127.0.0.1"
|
||||
o.LeafNode.Port = -1
|
||||
s := RunServer(o)
|
||||
defer s.Shutdown()
|
||||
|
||||
l := &oldConnReplacedLogger{errCh: make(chan string, 10), warnCh: make(chan string, 10)}
|
||||
s.SetLogger(l, false, false)
|
||||
|
||||
p := &proxyAcceptDetectFailureLate{acceptPort: o.LeafNode.Port}
|
||||
defer p.close()
|
||||
port := p.run(t)
|
||||
|
||||
aurl, err := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", port))
|
||||
if err != nil {
|
||||
t.Fatalf("Error parsing url: %v", err)
|
||||
}
|
||||
ol := DefaultOptions()
|
||||
ol.Cluster.Name = "cde"
|
||||
ol.LeafNode.ReconnectInterval = 50 * time.Millisecond
|
||||
ol.LeafNode.Remotes = []*RemoteLeafOpts{&RemoteLeafOpts{URLs: []*url.URL{aurl}}}
|
||||
sl := RunServer(ol)
|
||||
defer sl.Shutdown()
|
||||
|
||||
checkLeafNodeConnected(t, s)
|
||||
checkLeafNodeConnected(t, sl)
|
||||
|
||||
// Cause disconnect client side...
|
||||
p.Lock()
|
||||
p.leaf.Close()
|
||||
p.Unlock()
|
||||
|
||||
// Make sure we did not get the loop detected error
|
||||
select {
|
||||
case e := <-l.errCh:
|
||||
if strings.Contains(e, "Loop detected") {
|
||||
t.Fatalf("Loop detected: %v", e)
|
||||
}
|
||||
case <-time.After(250 * time.Millisecond):
|
||||
// We are ok
|
||||
}
|
||||
|
||||
// Now make sure we got the warning
|
||||
select {
|
||||
case w := <-l.warnCh:
|
||||
if !strings.Contains(w, "Replacing connection from same server") {
|
||||
t.Fatalf("Unexpected warning: %v", w)
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("Did not get expected warning")
|
||||
}
|
||||
|
||||
checkLeafNodeConnected(t, s)
|
||||
checkLeafNodeConnected(t, sl)
|
||||
}
|
||||
|
||||
@@ -1722,7 +1722,7 @@ func TestLeafNodeExportsImports(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestLeadNodeExportImportComplexSetup(t *testing.T) {
|
||||
func TestLeafNodeExportImportComplexSetup(t *testing.T) {
|
||||
content := `
|
||||
port: -1
|
||||
operator = "./configs/nkeys/op.jwt"
|
||||
|
||||
Reference in New Issue
Block a user