Merge pull request #1170 from nats-io/fix_detect_leafnode_loop

[FIXED] Detect loop between LeafNode servers
This commit is contained in:
Ivan Kozlovic
2019-10-29 18:35:20 -06:00
committed by GitHub
6 changed files with 161 additions and 32 deletions

View File

@@ -66,6 +66,7 @@ type Account struct {
expired bool
signingKeys []string
srv *Server // server this account is registered with (possibly nil)
lds string // loop detection subject for leaf nodes
}
// Account based limits.

View File

@@ -34,11 +34,18 @@ import (
"time"
"github.com/nats-io/nkeys"
"github.com/nats-io/nuid"
)
// Warning when user configures leafnode TLS insecure
const leafnodeTLSInsecureWarning = "TLS certificate chain and hostname of solicited leafnodes will not be verified. DO NOT USE IN PRODUCTION!"
// When a loop is detected, delay the reconnect of solicited connection.
const leafNodeReconnectDelayAfterLoopDetected = 30 * time.Second
// Prefix for loop detection subject
const leafNodeLoopDetectionSubjectPrefix = "lds."
type leaf struct {
// Used to suppress sub and unsub interest. Same as routes but our audience
// here is tied to this leaf node. This will hold all subscriptions except this
@@ -52,11 +59,12 @@ type leaf struct {
type leafNodeCfg struct {
sync.RWMutex
*RemoteLeafOpts
urls []*url.URL
curURL *url.URL
tlsName string
username string
password string
urls []*url.URL
curURL *url.URL
tlsName string
username string
password string
loopDelay time.Duration // A loop condition was detected
}
// Check to see if this is a solicited leafnode. We do special processing for solicited.
@@ -177,6 +185,24 @@ func (cfg *leafNodeCfg) getCurrentURL() *url.URL {
return cfg.curURL
}
// Returns how long the server should wait before attempting
// to solicit a remote leafnode connection following the
// detection of a loop.
// Returns 0 if no loop was detected.
func (cfg *leafNodeCfg) getLoopDelay() time.Duration {
cfg.RLock()
delay := cfg.loopDelay
cfg.RUnlock()
return delay
}
// Reset the loop delay.
func (cfg *leafNodeCfg) resetLoopDelay() {
cfg.Lock()
cfg.loopDelay = 0
cfg.Unlock()
}
// Ensure that non-exported options (used in tests) have
// been properly set.
func (s *Server) setLeafNodeNonExportedOptions() {
@@ -207,6 +233,15 @@ func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg, firstConnect bool)
resolver := s.leafNodeOpts.resolver
s.mu.Unlock()
if loopDelay := remote.getLoopDelay(); loopDelay > 0 {
select {
case <-time.After(loopDelay):
case <-s.quitCh:
return
}
remote.resetLoopDelay()
}
var conn net.Conn
const connErrFmt = "Error trying to connect as leafnode to remote server %q (attempt %v): %v"
@@ -921,7 +956,7 @@ func (s *Server) initLeafNodeSmap(c *client) {
_subs := [32]*subscription{}
subs := _subs[:0]
ims := []string{}
acc.mu.RLock()
acc.mu.Lock()
accName := acc.Name
// If we are solicited we only send interest for local clients.
if c.isSolicitedLeafNode() {
@@ -934,7 +969,13 @@ func (s *Server) initLeafNodeSmap(c *client) {
for isubj := range acc.imports.services {
ims = append(ims, isubj)
}
acc.mu.RUnlock()
// Create a unique subject that will be used for loop detection.
lds := acc.lds
if lds == _EMPTY_ {
lds = leafNodeLoopDetectionSubjectPrefix + nuid.Next()
acc.lds = lds
}
acc.mu.Unlock()
// Now check for gateway interest. Leafnodes will put this into
// the proper mode to propagate, but they are not held in the account.
@@ -974,6 +1015,11 @@ func (s *Server) initLeafNodeSmap(c *client) {
if applyGlobalRouting {
c.leaf.smap[gwReplyPrefix+"*.>"]++
}
// Detect loop by subscribing to a specific subject and checking
// if this is coming back to us.
if c.leaf.remote == nil {
c.leaf.smap[lds]++
}
c.mu.Unlock()
}
@@ -1154,6 +1200,13 @@ func (c *client) processLeafSub(argo []byte) (err error) {
return nil
}
// Check if we have a loop.
if string(sub.subject) == c.acc.lds {
c.mu.Unlock()
srv.reportLeafNodeLoop(c)
return nil
}
// Check permissions if applicable.
if !c.canExport(string(sub.subject)) {
c.mu.Unlock()
@@ -1217,6 +1270,24 @@ func (c *client) processLeafSub(argo []byte) (err error) {
return nil
}
func (s *Server) reportLeafNodeLoop(c *client) {
delay := leafNodeReconnectDelayAfterLoopDetected
opts := s.getOpts()
if opts.LeafNode.loopDelay != 0 {
delay = opts.LeafNode.loopDelay
}
c.mu.Lock()
if c.leaf.remote != nil {
c.leaf.remote.Lock()
c.leaf.remote.loopDelay = delay
c.leaf.remote.Unlock()
}
accName := c.acc.Name
c.mu.Unlock()
c.sendErrAndErr(fmt.Sprintf("Loop detected for leafnode account=%q. Delaying attempt to reconnect for %v",
accName, delay))
}
// processLeafUnsub will process an inbound unsub request for the remote leaf node.
func (c *client) processLeafUnsub(arg []byte) error {
c.traceInOp("LS-", arg)

View File

@@ -739,3 +739,44 @@ func TestLeafNodeBasicAuthMultiple(t *testing.T) {
s3, _ := RunServerWithConfig(conf)
defer s3.Shutdown()
}
func TestLeafNodeLoop(t *testing.T) {
// This test requires that we set the port to known value because
// we want A point to B and B to A.
oa := DefaultOptions()
oa.LeafNode.ReconnectInterval = 10 * time.Millisecond
oa.LeafNode.Port = 1234
ub, _ := url.Parse("nats://127.0.0.1:5678")
oa.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{ub}}}
oa.LeafNode.loopDelay = 50 * time.Millisecond
sa := RunServer(oa)
defer sa.Shutdown()
l := &captureErrorLogger{errCh: make(chan string, 10)}
sa.SetLogger(l, false, false)
ob := DefaultOptions()
ob.LeafNode.ReconnectInterval = 10 * time.Millisecond
ob.LeafNode.Port = 5678
ua, _ := url.Parse("nats://127.0.0.1:1234")
ob.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{ua}}}
ob.LeafNode.loopDelay = 50 * time.Millisecond
sb := RunServer(ob)
defer sb.Shutdown()
select {
case e := <-l.errCh:
if !strings.Contains(e, "Loop") {
t.Fatalf("Expected error about loop, got %v", e)
}
case <-time.After(2 * time.Second):
t.Fatalf("Did not get any error regarding loop")
}
sb.Shutdown()
ob.LeafNode.Remotes = nil
sb = RunServer(ob)
defer sb.Shutdown()
checkLeafNodeConnected(t, sa)
}

View File

@@ -3329,14 +3329,26 @@ func TestMonitorLeafz(t *testing.T) {
if ln.RTT == "" {
t.Fatalf("RTT not tracked?")
}
if ln.NumSubs != 2 {
t.Fatalf("Expected 2 subs, got %v", ln.NumSubs)
if ln.NumSubs != 3 {
t.Fatalf("Expected 3 subs, got %v", ln.NumSubs)
}
if len(ln.Subs) != 2 {
if len(ln.Subs) != 3 {
t.Fatalf("Expected subs to be returned, got %v", len(ln.Subs))
}
if (ln.Subs[0] != "foo" || ln.Subs[1] != "bar") && (ln.Subs[0] != "bar" || ln.Subs[1] != "foo") {
t.Fatalf("Unexpected subjects: %v", ln.Subs)
var foundFoo bool
var foundBar bool
for _, sub := range ln.Subs {
if sub == "foo" {
foundFoo = true
} else if sub == "bar" {
foundBar = true
}
}
if !foundFoo {
t.Fatal("Did not find subject foo")
}
if !foundBar {
t.Fatal("Did not find subject bar")
}
}
}
@@ -3345,8 +3357,8 @@ func TestMonitorLeafz(t *testing.T) {
for pollMode := 1; pollMode < 2; pollMode++ {
l := pollLeafz(t, sa, pollMode, pollURL, nil)
for _, ln := range l.Leafs {
if ln.NumSubs != 2 {
t.Fatalf("Number of subs should be 2, got %v", ln.NumSubs)
if ln.NumSubs != 3 {
t.Fatalf("Number of subs should be 3, got %v", ln.NumSubs)
}
if len(ln.Subs) != 0 {
t.Fatalf("Subs should not have been returned, got %v", ln.Subs)

View File

@@ -125,6 +125,7 @@ type LeafNodeOpts struct {
// Not exported, for tests.
resolver netResolver
dialTimeout time.Duration
loopDelay time.Duration
}
// RemoteLeafOpts are options for connecting to a remote server as a leaf node.

View File

@@ -163,6 +163,15 @@ func TestLeafNodeRequiresConnect(t *testing.T) {
expectDisconnect(t, lc)
}
func setupLeaf(t *testing.T, lc net.Conn, expectedSubs int) (sendFun, expectFun) {
t.Helper()
send, expect := setupConn(t, lc)
// A loop detection subscription is sent, so consume this here, along
// with the ones that caller expect on setup.
expectNumberOfProtos(t, expect, lsubRe, expectedSubs)
return send, expect
}
func TestLeafNodeSendsSubsAfterConnect(t *testing.T) {
s, opts := runLeafServer()
defer s.Shutdown()
@@ -182,12 +191,9 @@ func TestLeafNodeSendsSubsAfterConnect(t *testing.T) {
lc := createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port)
defer lc.Close()
_, leafExpect := setupConn(t, lc)
matches := lsubRe.FindAllSubmatch(leafExpect(lsubRe), -1)
// This should compress down to 1 for foo, 1 for bar, and 1 for foo [baz]
if len(matches) != 3 {
t.Fatalf("Expected 3 results, got %d", len(matches))
}
// and one for the loop detection subject.
setupLeaf(t, lc, 4)
}
func TestLeafNodeSendsSubsOngoing(t *testing.T) {
@@ -204,7 +210,7 @@ func TestLeafNodeSendsSubsOngoing(t *testing.T) {
lc := createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port)
defer lc.Close()
leafSend, leafExpect := setupConn(t, lc)
leafSend, leafExpect := setupLeaf(t, lc, 1)
leafSend("PING\r\n")
leafExpect(pongRe)
@@ -246,7 +252,7 @@ func TestLeafNodeSubs(t *testing.T) {
lc := createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port)
defer lc.Close()
leafSend, leafExpect := setupConn(t, lc)
leafSend, leafExpect := setupLeaf(t, lc, 1)
leafSend("PING\r\n")
leafExpect(pongRe)
@@ -328,7 +334,7 @@ func TestLeafNodeMsgDelivery(t *testing.T) {
lc := createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port)
defer lc.Close()
leafSend, leafExpect := setupConn(t, lc)
leafSend, leafExpect := setupLeaf(t, lc, 1)
leafSend("PING\r\n")
leafExpect(pongRe)
@@ -398,7 +404,7 @@ func TestLeafNodeAndRoutes(t *testing.T) {
lc := createLeafConn(t, optsA.LeafNode.Host, optsA.LeafNode.Port)
defer lc.Close()
leafSend, leafExpect := setupConn(t, lc)
leafSend, leafExpect := setupLeaf(t, lc, 1)
leafSend("PING\r\n")
leafExpect(pongRe)
@@ -493,7 +499,7 @@ func TestLeafNodeNoEcho(t *testing.T) {
lc := createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port)
defer lc.Close()
leafSend, leafExpect := setupConn(t, lc)
leafSend, leafExpect := setupLeaf(t, lc, 1)
leafSend("PING\r\n")
leafExpect(pongRe)
@@ -699,9 +705,8 @@ func TestLeafNodeGatewaySendsSystemEvent(t *testing.T) {
lc := createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port)
defer lc.Close()
leafSend, leafExpect := setupConn(t, lc)
// This is for our global responses since we are setting up GWs above.
leafExpect(lsubRe)
leafSend, leafExpect := setupLeaf(t, lc, 2)
leafSend("PING\r\n")
leafExpect(pongRe)
@@ -784,9 +789,8 @@ func TestLeafNodeWithRouteAndGateway(t *testing.T) {
lc := createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port)
defer lc.Close()
leafSend, leafExpect := setupConn(t, lc)
// This is for our global responses since we are setting up GWs above.
leafExpect(lsubRe)
leafSend, leafExpect := setupLeaf(t, lc, 2)
leafSend("PING\r\n")
leafExpect(pongRe)
@@ -924,6 +928,7 @@ func TestLeafNodeBasicAuth(t *testing.T) {
lc = createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port)
defer lc.Close()
leafSend, leafExpect := setupConnWithUserPass(t, lc, "derek", "s3cr3t!")
leafExpect(lsubRe)
leafSend("PING\r\n")
leafExpect(pongRe)
@@ -2126,9 +2131,8 @@ func TestLeafNodeSwitchGatewayToInterestModeOnly(t *testing.T) {
lc := createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port)
defer lc.Close()
leafSend, leafExpect := setupConn(t, lc)
// This is for our global responses since we are setting up GWs above.
leafExpect(lsubRe)
leafSend, leafExpect := setupLeaf(t, lc, 2)
leafSend("PING\r\n")
leafExpect(pongRe)
}
@@ -2234,8 +2238,7 @@ func TestLeafNodeSendsRemoteSubsOnConnect(t *testing.T) {
lc := createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port)
defer lc.Close()
_, leafExpect := setupConn(t, lc)
leafExpect(lsubRe)
setupLeaf(t, lc, 2)
}
func TestLeafNodeServiceImportLikeNGS(t *testing.T) {