fix daisy chained leaf node subject propagation issue. (#2468)

fixes #2448 

initLeafNodeSmapAndSendSubs did not pick up enough local subscriptions.

Signed-off-by: Matthias Hanel <mh@synadia.com>
This commit is contained in:
Matthias Hanel
2021-08-25 18:10:09 -04:00
committed by GitHub
parent 295280c676
commit 41a253dabb
6 changed files with 96 additions and 21 deletions

View File

@@ -1481,7 +1481,7 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
// If we are solicited we only send interest for local clients.
if c.isSpokeLeafNode() {
acc.sl.localSubs(&subs)
acc.sl.localSubs(&subs, true)
} else {
acc.sl.All(&subs)
}

View File

@@ -3965,3 +3965,75 @@ leafnodes:{
test("pubdeny", ncA, ncL, nil, "A", false)
})
}
func TestLeafNodeInterestPropagationDaisychain(t *testing.T) {
aTmpl := `
port: %d
leafnodes {
port: %d
}
}`
confA := createConfFile(t, []byte(fmt.Sprintf(aTmpl, -1, -1)))
defer removeFile(t, confA)
sA, _ := RunServerWithConfig(confA)
defer sA.Shutdown()
aPort := sA.opts.Port
aLeafPort := sA.opts.LeafNode.Port
confB := createConfFile(t, []byte(fmt.Sprintf(`
port: -1
leafnodes {
port: -1
remotes = [{
url:"nats://127.0.0.1:%d"
}]
}`, aLeafPort)))
defer removeFile(t, confB)
sB, _ := RunServerWithConfig(confB)
defer sB.Shutdown()
confC := createConfFile(t, []byte(fmt.Sprintf(`
port: -1
leafnodes {
port: -1
remotes = [{url:"nats://127.0.0.1:%d"}]
}`, sB.opts.LeafNode.Port)))
defer removeFile(t, confC)
sC, _ := RunServerWithConfig(confC)
defer sC.Shutdown()
checkLeafNodeConnectedCount(t, sC, 1)
checkLeafNodeConnectedCount(t, sB, 2)
checkLeafNodeConnectedCount(t, sA, 1)
ncC := natsConnect(t, sC.ClientURL())
defer ncC.Close()
_, err := ncC.SubscribeSync("foo")
require_NoError(t, err)
require_NoError(t, ncC.Flush())
checkSubInterest(t, sC, "$G", "foo", time.Second)
checkSubInterest(t, sB, "$G", "foo", time.Second)
checkSubInterest(t, sA, "$G", "foo", time.Second)
ncA := natsConnect(t, sA.ClientURL())
defer ncA.Close()
sA.Shutdown()
sA.WaitForShutdown()
confAA := createConfFile(t, []byte(fmt.Sprintf(aTmpl, aPort, aLeafPort)))
defer removeFile(t, confAA)
sAA, _ := RunServerWithConfig(confAA)
defer sAA.Shutdown()
checkLeafNodeConnectedCount(t, sAA, 1)
checkLeafNodeConnectedCount(t, sB, 2)
checkLeafNodeConnectedCount(t, sC, 1)
checkSubInterest(t, sC, "$G", "foo", time.Second)
checkSubInterest(t, sB, "$G", "foo", time.Second)
checkSubInterest(t, sAA, "$G", "foo", time.Second) // failure issue 2448
}

View File

@@ -941,7 +941,7 @@ func (s *Server) Subsz(opts *SubszOptions) (*Subsz, error) {
return true
}
slStats.add(acc.sl.Stats())
acc.sl.localSubs(&subs)
acc.sl.localSubs(&subs, false)
return true
})

View File

@@ -1722,7 +1722,7 @@ func (s *Server) reloadClusterPermissions(oldPerms *RoutePermissions) {
deleteRoutedSubs []*subscription
)
// FIXME(dlc) - Change for accounts.
gacc.sl.localSubs(&localSubs)
gacc.sl.localSubs(&localSubs, false)
// Go through all local subscriptions
for _, sub := range localSubs {

View File

@@ -710,7 +710,7 @@ func (c *client) updateRemoteRoutePerms(sl *Sublist, info *Info) {
_localSubs [4096]*subscription
localSubs = _localSubs[:0]
)
sl.localSubs(&localSubs)
sl.localSubs(&localSubs, false)
c.sendRouteSubProtos(localSubs, false, func(sub *subscription) bool {
subj := string(sub.subject)

View File

@@ -1368,51 +1368,54 @@ func matchLiteral(literal, subject string) bool {
return li >= ll
}
func addLocalSub(sub *subscription, subs *[]*subscription) {
if sub != nil && sub.client != nil &&
(sub.client.kind == CLIENT || sub.client.kind == SYSTEM || sub.client.kind == JETSTREAM || sub.client.kind == ACCOUNT) && sub.im == nil {
*subs = append(*subs, sub)
func addLocalSub(sub *subscription, subs *[]*subscription, includeLeafHubs bool) {
if sub != nil && sub.client != nil && sub.im == nil {
kind := sub.client.kind
if kind == CLIENT || kind == SYSTEM || kind == JETSTREAM || kind == ACCOUNT ||
(includeLeafHubs && sub.client.isHubLeafNode() /* implied kind==LEAF */) {
*subs = append(*subs, sub)
}
}
}
func (s *Sublist) addNodeToSubs(n *node, subs *[]*subscription) {
func (s *Sublist) addNodeToSubs(n *node, subs *[]*subscription, includeLeafHubs bool) {
// Normal subscriptions
if n.plist != nil {
for _, sub := range n.plist {
addLocalSub(sub, subs)
addLocalSub(sub, subs, includeLeafHubs)
}
} else {
for _, sub := range n.psubs {
addLocalSub(sub, subs)
addLocalSub(sub, subs, includeLeafHubs)
}
}
// Queue subscriptions
for _, qr := range n.qsubs {
for _, sub := range qr {
addLocalSub(sub, subs)
addLocalSub(sub, subs, includeLeafHubs)
}
}
}
func (s *Sublist) collectLocalSubs(l *level, subs *[]*subscription) {
func (s *Sublist) collectLocalSubs(l *level, subs *[]*subscription, includeLeafHubs bool) {
for _, n := range l.nodes {
s.addNodeToSubs(n, subs)
s.collectLocalSubs(n.next, subs)
s.addNodeToSubs(n, subs, includeLeafHubs)
s.collectLocalSubs(n.next, subs, includeLeafHubs)
}
if l.pwc != nil {
s.addNodeToSubs(l.pwc, subs)
s.collectLocalSubs(l.pwc.next, subs)
s.addNodeToSubs(l.pwc, subs, includeLeafHubs)
s.collectLocalSubs(l.pwc.next, subs, includeLeafHubs)
}
if l.fwc != nil {
s.addNodeToSubs(l.fwc, subs)
s.collectLocalSubs(l.fwc.next, subs)
s.addNodeToSubs(l.fwc, subs, includeLeafHubs)
s.collectLocalSubs(l.fwc.next, subs, includeLeafHubs)
}
}
// Return all local client subscriptions. Use the supplied slice.
func (s *Sublist) localSubs(subs *[]*subscription) {
func (s *Sublist) localSubs(subs *[]*subscription, includeLeafHubs bool) {
s.RLock()
s.collectLocalSubs(s.root, subs)
s.collectLocalSubs(s.root, subs, includeLeafHubs)
s.RUnlock()
}