mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 11:04:42 -07:00
Detection for loops with leafnodes.
We need to send the unique LDS subject to all leafnodes to properly detect setups like triangles. This will have the server who completes the loop be the one that detects the error soley based on its own loop detection subject. Otehr changes are just to fix tests that were not waiting for the new LDS sub. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -1712,6 +1712,14 @@ func (a *Account) hasIssuerNoLock(issuer string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// Returns the loop detection subject used for leafnodes
|
||||
func (a *Account) getLDSubject() string {
|
||||
a.mu.RLock()
|
||||
lds := a.lds
|
||||
a.mu.RUnlock()
|
||||
return lds
|
||||
}
|
||||
|
||||
// Placeholder for signaling token auth required.
|
||||
var tokenAuthReq = []*Account{}
|
||||
|
||||
|
||||
@@ -360,11 +360,11 @@ func runSolicitWithCredentials(t *testing.T, opts *Options, creds string) (*Serv
|
||||
|
||||
// Helper function to check that a leaf node has connected to our server.
|
||||
func checkLeafNodeConnected(t *testing.T, s *Server) {
|
||||
checkLeafNodeConnectedCnt(t, s, 1)
|
||||
checkLeafNodeConnectedCount(t, s, 1)
|
||||
}
|
||||
|
||||
// Helper function to check that a leaf node has connected to n server.
|
||||
func checkLeafNodeConnectedCnt(t *testing.T, s *Server, lnCons int) {
|
||||
func checkLeafNodeConnectedCount(t *testing.T, s *Server, lnCons int) {
|
||||
t.Helper()
|
||||
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
|
||||
if nln := s.NumLeafNodes(); nln != lnCons {
|
||||
|
||||
@@ -45,8 +45,7 @@ const leafnodeTLSInsecureWarning = "TLS certificate chain and hostname of solici
|
||||
const leafNodeReconnectDelayAfterLoopDetected = 30 * time.Second
|
||||
|
||||
// Prefix for loop detection subject
|
||||
const leafNodeLoopDetectionSubjectPrefixOld = "lds."
|
||||
const leafNodeLoopDetectionSubjectPrefix = "$" + leafNodeLoopDetectionSubjectPrefixOld
|
||||
const leafNodeLoopDetectionSubjectPrefix = "$LDS."
|
||||
|
||||
type leaf struct {
|
||||
// Used to suppress sub and unsub interest. Same as routes but our audience
|
||||
@@ -1088,9 +1087,8 @@ func (s *Server) initLeafNodeSmap(c *client) int {
|
||||
}
|
||||
// Detect loop by subscribing to a specific subject and checking
|
||||
// if this is coming back to us.
|
||||
if c.leaf.remote == nil {
|
||||
c.leaf.smap[lds]++
|
||||
}
|
||||
c.leaf.smap[lds]++
|
||||
|
||||
lenMap := len(c.leaf.smap)
|
||||
c.mu.Unlock()
|
||||
return lenMap
|
||||
@@ -1271,6 +1269,14 @@ func (c *client) processLeafSub(argo []byte) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
acc := c.acc
|
||||
// Check if we have a loop.
|
||||
if string(sub.subject) == acc.getLDSubject() {
|
||||
c.mu.Unlock()
|
||||
srv.reportLeafNodeLoop(c)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check permissions if applicable.
|
||||
if !c.canExport(string(sub.subject)) {
|
||||
c.mu.Unlock()
|
||||
@@ -1295,44 +1301,10 @@ func (c *client) processLeafSub(argo []byte) (err error) {
|
||||
key := string(sub.sid)
|
||||
osub := c.subs[key]
|
||||
updateGWs := false
|
||||
acc := c.acc
|
||||
if osub == nil {
|
||||
subj := string(sub.subject)
|
||||
accUnlock := false
|
||||
// Check if we have a loop.
|
||||
if len(subj) >= len(leafNodeLoopDetectionSubjectPrefixOld) {
|
||||
subStripped := subj
|
||||
if subStripped[0] == '$' {
|
||||
subStripped = subStripped[1:]
|
||||
}
|
||||
if strings.HasPrefix(subStripped, leafNodeLoopDetectionSubjectPrefixOld) {
|
||||
// The following check (involving acc.sl) and the later insert need to be tied together
|
||||
// using the account lock, such that checking and modifying the sublist appear as one operation.
|
||||
acc.mu.Lock()
|
||||
accUnlock = true
|
||||
// There is a loop if we receive our own subscription back.
|
||||
loopFound := subj == acc.lds
|
||||
if !loopFound {
|
||||
// Or if a subscription from a different client already exists.
|
||||
if res := acc.sl.Match(subj); res != nil && len(res.psubs)+len(res.qsubs) != 0 {
|
||||
loopFound = true
|
||||
}
|
||||
}
|
||||
if loopFound {
|
||||
acc.mu.Unlock()
|
||||
c.mu.Unlock()
|
||||
srv.reportLeafNodeLoop(c)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
c.subs[key] = sub
|
||||
// Now place into the account sl.
|
||||
err := acc.sl.Insert(sub)
|
||||
if accUnlock {
|
||||
acc.mu.Unlock()
|
||||
}
|
||||
if err != nil {
|
||||
if err := acc.sl.Insert(sub); err != nil {
|
||||
delete(c.subs, key)
|
||||
c.mu.Unlock()
|
||||
c.Errorf("Could not insert subscription: %v", err)
|
||||
|
||||
@@ -785,69 +785,66 @@ func TestLeafNodeLoop(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestLeafNodeLoopFromDAG(t *testing.T) {
|
||||
// we want A point to B and B to A.
|
||||
// We want B & C to point to A, A itself does not point to any other server.
|
||||
oa := DefaultOptions()
|
||||
oa.ServerName = "A"
|
||||
oa.LeafNode.ReconnectInterval = 10 * time.Millisecond
|
||||
oa.LeafNode.Port = -1
|
||||
sa := RunServer(oa)
|
||||
defer sa.Shutdown()
|
||||
|
||||
la := &captureErrorLogger{errCh: make(chan string, 10)}
|
||||
sa.SetLogger(la, false, false)
|
||||
ua, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", oa.LeafNode.Port))
|
||||
|
||||
// B will point to A
|
||||
ob := DefaultOptions()
|
||||
ob.ServerName = "B"
|
||||
ob.LeafNode.ReconnectInterval = 10 * time.Millisecond
|
||||
ob.LeafNode.Port = -1
|
||||
ob.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{ua}}}
|
||||
sb := RunServer(ob)
|
||||
defer sb.Shutdown()
|
||||
|
||||
lb := &captureErrorLogger{errCh: make(chan string, 10)}
|
||||
sb.SetLogger(lb, false, false)
|
||||
ub, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", ob.LeafNode.Port))
|
||||
|
||||
checkLeafNodeConnected(t, sa)
|
||||
checkLeafNodeConnected(t, sb)
|
||||
|
||||
// C will point to A and B
|
||||
oc := DefaultOptions()
|
||||
oc.ServerName = "C"
|
||||
oc.LeafNode.ReconnectInterval = 10 * time.Millisecond
|
||||
oc.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{ua}}, {URLs: []*url.URL{ub}}}
|
||||
oc.LeafNode.loopDelay = 100 * time.Millisecond // Allow logger to be attached before connecting.
|
||||
sc := RunServer(oc)
|
||||
// logger with channel can only be specified after startup and is thus not used
|
||||
|
||||
// either error channel (for a or b) may get the error, but not both.
|
||||
errCnt := 0
|
||||
lc := &captureErrorLogger{errCh: make(chan string, 10)}
|
||||
sc.SetLogger(lc, false, false)
|
||||
|
||||
errorLoop:
|
||||
for {
|
||||
select {
|
||||
case e := <-la.errCh:
|
||||
if !strings.Contains(e, "Loop") {
|
||||
t.Fatalf("Expected error about loop, got %v", e)
|
||||
}
|
||||
errCnt++
|
||||
case e := <-lb.errCh:
|
||||
if !strings.Contains(e, "Loop") {
|
||||
t.Fatalf("Expected error about loop, got %v", e)
|
||||
}
|
||||
errCnt++
|
||||
case <-time.After(2 * time.Second):
|
||||
if errCnt != 1 {
|
||||
t.Fatalf("Did not get any error regarding loop")
|
||||
}
|
||||
break errorLoop
|
||||
// We should get an error.
|
||||
select {
|
||||
case e := <-lc.errCh:
|
||||
if !strings.Contains(e, "Loop") {
|
||||
t.Fatalf("Expected error about loop, got %v", e)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatalf("Did not get any error regarding loop")
|
||||
}
|
||||
|
||||
// C should not be connected to anything.
|
||||
checkLeafNodeConnectedCount(t, sc, 0)
|
||||
// A and B are connected to each other.
|
||||
checkLeafNodeConnectedCount(t, sa, 1)
|
||||
checkLeafNodeConnectedCount(t, sb, 1)
|
||||
|
||||
// Shutdown C and restart without the loop.
|
||||
sc.Shutdown()
|
||||
oc.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{ub}}}
|
||||
sc = RunServer(oc)
|
||||
defer sc.Shutdown()
|
||||
|
||||
checkLeafNodeConnected(t, sa)
|
||||
checkLeafNodeConnectedCnt(t, sb, 2)
|
||||
checkLeafNodeConnected(t, sc)
|
||||
checkLeafNodeConnectedCount(t, sa, 1)
|
||||
checkLeafNodeConnectedCount(t, sb, 2)
|
||||
checkLeafNodeConnectedCount(t, sc, 1)
|
||||
}
|
||||
|
||||
func TestLeafCloseTLSConnection(t *testing.T) {
|
||||
|
||||
@@ -3470,8 +3470,9 @@ func TestMonitorLeafz(t *testing.T) {
|
||||
if ln.RTT == "" {
|
||||
t.Fatalf("RTT not tracked?")
|
||||
}
|
||||
if ln.NumSubs != 0 || len(ln.Subs) != 0 {
|
||||
t.Fatalf("Did not expect sub, got %v (%v)", ln.NumSubs, ln.Subs)
|
||||
// LDS should be only one.
|
||||
if ln.NumSubs != 1 || len(ln.Subs) != 1 {
|
||||
t.Fatalf("Expected 1 sub, got %v (%v)", ln.NumSubs, ln.Subs)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,7 +23,6 @@ import (
|
||||
"net"
|
||||
"net/url"
|
||||
"os"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -565,45 +564,6 @@ func TestLeafNodeNoEcho(t *testing.T) {
|
||||
expectNothing(t, lc)
|
||||
}
|
||||
|
||||
func TestLeafNodeLoop(t *testing.T) {
|
||||
s, opts := runLeafServer()
|
||||
defer s.Shutdown()
|
||||
|
||||
c := createClientConn(t, opts.Host, opts.Port)
|
||||
defer c.Close()
|
||||
|
||||
send, expect := setupConn(t, c)
|
||||
send("PING\r\n")
|
||||
expect(pongRe)
|
||||
|
||||
lc1 := createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port)
|
||||
defer lc1.Close()
|
||||
|
||||
leaf1Send, leaf1Expect := setupLeaf(t, lc1, 1)
|
||||
leaf1Send("PING\r\n")
|
||||
leaf1Expect(pongRe)
|
||||
|
||||
leaf1Send("LS+ $lds.foo\r\n")
|
||||
expectNothing(t, lc1)
|
||||
|
||||
// Same loop detection subscription from same client
|
||||
leaf1Send("LS+ $lds.foo\r\n")
|
||||
expectNothing(t, lc1)
|
||||
|
||||
lc2 := createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port)
|
||||
defer lc2.Close()
|
||||
|
||||
leaf2Send, leaf2Expect := setupLeaf(t, lc2, 2)
|
||||
leaf2Send("PING\r\n")
|
||||
leaf2Expect(pongRe)
|
||||
|
||||
// Same loop detection subscription from different client
|
||||
leaf2Send("LS+ $lds.foo\r\n")
|
||||
leaf2Expect(regexp.MustCompile(
|
||||
"-ERR 'Loop detected for leafnode account=\".G\". " +
|
||||
"Delaying attempt to reconnect for .*"))
|
||||
}
|
||||
|
||||
// Used to setup clusters of clusters for tests.
|
||||
type cluster struct {
|
||||
servers []*server.Server
|
||||
@@ -847,12 +807,12 @@ func TestLeafNodeGatewayInterestPropagation(t *testing.T) {
|
||||
defer lc.Close()
|
||||
_, leafExpect := setupConn(t, lc)
|
||||
var totalBuf []byte
|
||||
for count := 0; count != 4; {
|
||||
for count := 0; count != 5; {
|
||||
buf := leafExpect(lsubRe)
|
||||
totalBuf = append(totalBuf, buf...)
|
||||
count += len(lsubRe.FindAllSubmatch(buf, -1))
|
||||
if count > 4 {
|
||||
t.Fatalf("Expected %v matches, got %v (buf=%s)", 3, count, totalBuf)
|
||||
if count > 5 {
|
||||
t.Fatalf("Expected %v matches, got %v (buf=%s)", 4, count, totalBuf)
|
||||
}
|
||||
}
|
||||
if !strings.Contains(string(totalBuf), "foo") {
|
||||
@@ -1397,9 +1357,9 @@ func TestLeafNodeMultipleAccounts(t *testing.T) {
|
||||
|
||||
lsub, _ := ncl.SubscribeSync("foo.test")
|
||||
|
||||
// Wait for the sub to propagate.
|
||||
checkFor(t, time.Second, 10*time.Millisecond, func() error {
|
||||
if subs := s.NumSubscriptions(); subs < 1 {
|
||||
// Wait for the subs to propagate. LDS + foo.test
|
||||
checkFor(t, 2*time.Second, 10*time.Millisecond, func() error {
|
||||
if subs := s.NumSubscriptions(); subs < 2 {
|
||||
return fmt.Errorf("Number of subs is %d", subs)
|
||||
}
|
||||
return nil
|
||||
@@ -1578,9 +1538,9 @@ func TestLeafNodeExportsImports(t *testing.T) {
|
||||
// So everything should be setup here. So let's test streams first.
|
||||
lsub, _ := ncl.SubscribeSync("import.foo.stream")
|
||||
|
||||
// Wait for the sub to propagate.
|
||||
// Wait for the subs to propagate.
|
||||
checkFor(t, time.Second, 10*time.Millisecond, func() error {
|
||||
if subs := s.NumSubscriptions(); subs < 1 {
|
||||
if subs := s.NumSubscriptions(); subs < 2 {
|
||||
return fmt.Errorf("Number of subs is %d", subs)
|
||||
}
|
||||
return nil
|
||||
@@ -1740,9 +1700,9 @@ func TestLeadNodeExportImportComplexSetup(t *testing.T) {
|
||||
// So everything should be setup here. So let's test streams first.
|
||||
lsub, _ := ncl.SubscribeSync("import.foo.stream")
|
||||
|
||||
// Wait for the sub to propagate to s2.
|
||||
// Wait for the sub to propagate to s2. LDS + subject above.
|
||||
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
|
||||
if acc1.RoutedSubs() == 0 {
|
||||
if acc1.RoutedSubs() != 2 {
|
||||
return fmt.Errorf("Still no routed subscription")
|
||||
}
|
||||
return nil
|
||||
@@ -2440,7 +2400,7 @@ func TestLeafNodeSendsRemoteSubsOnConnect(t *testing.T) {
|
||||
lc := createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port)
|
||||
defer lc.Close()
|
||||
|
||||
setupLeaf(t, lc, 2)
|
||||
setupLeaf(t, lc, 3)
|
||||
}
|
||||
|
||||
func TestLeafNodeServiceImportLikeNGS(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user