From 21a9bfa1d857579fcdab7bf69de32fcb13d79d20 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Mon, 5 Apr 2021 14:29:17 -0600 Subject: [PATCH] [FIXED] Leafnode: incorrect loop detection in multi-cluster setup If leafnodes from a cluster were to reconnect to a server in a different cluster, it was possible for that server to send to the leafnodes some their own subscriptions that could cause an inproper loop detection error. There was also a defect that would cause subscriptions over route for leafnode subscriptions to be registered under the wrong key, which would lead to those subscriptions not being properly removed on route disconnect. Finally, during route disconnect, the leafnodes map was not updated. This PR fixes that too. Signed-off-by: Ivan Kozlovic --- server/leafnode.go | 5 +- server/leafnode_test.go | 219 ++++++++++++++++++++++++++++++++++++++++ server/route.go | 8 +- 3 files changed, 228 insertions(+), 4 deletions(-) diff --git a/server/leafnode.go b/server/leafnode.go index 6adc0186..19583e30 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -1256,10 +1256,13 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) { // Now walk the results and add them to our smap c.mu.Lock() + rc := c.leaf.remoteCluster c.leaf.smap = make(map[string]int32) for _, sub := range subs { // We ignore ourselves here. - if c != sub.client { + // Also don't add the subscription if it has a origin cluster and the + // cluster name matches the one of the client we are sending to. + if c != sub.client && (sub.origin == nil || (string(sub.origin) != rc)) { c.leaf.smap[keyFromSub(sub)]++ if c.leaf.tsub == nil { c.leaf.tsub = make(map[*subscription]struct{}) diff --git a/server/leafnode_test.go b/server/leafnode_test.go index 61fad5d4..0a69b490 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -3066,3 +3066,222 @@ func TestLeafNodeStreamImport(t *testing.T) { natsNexMsg(t, sub, time.Second) } + +func TestLeafNodeRouteSubWithOrigin(t *testing.T) { + lo1 := DefaultOptions() + lo1.LeafNode.Host = "127.0.0.1" + lo1.LeafNode.Port = -1 + lo1.Cluster.Name = "local" + lo1.Cluster.Host = "127.0.0.1" + lo1.Cluster.Port = -1 + l1 := RunServer(lo1) + defer l1.Shutdown() + + lo2 := DefaultOptions() + lo2.LeafNode.Host = "127.0.0.1" + lo2.LeafNode.Port = -1 + lo2.Cluster.Name = "local" + lo2.Cluster.Host = "127.0.0.1" + lo2.Cluster.Port = -1 + lo2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", lo1.Cluster.Port)) + l2 := RunServer(lo2) + defer l2.Shutdown() + + checkClusterFormed(t, l1, l2) + + u1, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", lo1.LeafNode.Port)) + urls := []*url.URL{u1} + + ro1 := DefaultOptions() + ro1.Cluster.Name = "remote" + ro1.Cluster.Host = "127.0.0.1" + ro1.Cluster.Port = -1 + ro1.LeafNode.ReconnectInterval = 50 * time.Millisecond + ro1.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: urls}} + r1 := RunServer(ro1) + defer r1.Shutdown() + + checkLeafNodeConnected(t, r1) + + nc := natsConnect(t, r1.ClientURL(), nats.NoReconnect()) + defer nc.Close() + natsSubSync(t, nc, "foo") + natsQueueSubSync(t, nc, "bar", "baz") + checkSubInterest(t, l2, globalAccountName, "foo", time.Second) + checkSubInterest(t, l2, globalAccountName, "bar", time.Second) + + // Now shutdown the leafnode and check that any subscription for $G on l2 are gone. + r1.Shutdown() + checkFor(t, time.Second, 15*time.Millisecond, func() error { + acc := l2.GlobalAccount() + if n := acc.TotalSubs(); n != 0 { + return fmt.Errorf("Account %q should have 0 sub, got %v", acc.GetName(), n) + } + return nil + }) +} + +func TestLeafNodeLoopDetectionWithMultipleClusters(t *testing.T) { + lo1 := DefaultOptions() + lo1.LeafNode.Host = "127.0.0.1" + lo1.LeafNode.Port = -1 + lo1.Cluster.Name = "local" + lo1.Cluster.Host = "127.0.0.1" + lo1.Cluster.Port = -1 + l1 := RunServer(lo1) + defer l1.Shutdown() + + lo2 := DefaultOptions() + lo2.LeafNode.Host = "127.0.0.1" + lo2.LeafNode.Port = -1 + lo2.Cluster.Name = "local" + lo2.Cluster.Host = "127.0.0.1" + lo2.Cluster.Port = -1 + lo2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", lo1.Cluster.Port)) + l2 := RunServer(lo2) + defer l2.Shutdown() + + checkClusterFormed(t, l1, l2) + + u1, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", lo1.LeafNode.Port)) + u2, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", lo2.LeafNode.Port)) + urls := []*url.URL{u1, u2} + + ro1 := DefaultOptions() + ro1.Cluster.Name = "remote" + ro1.Cluster.Host = "127.0.0.1" + ro1.Cluster.Port = -1 + ro1.LeafNode.ReconnectInterval = 50 * time.Millisecond + ro1.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: urls}} + r1 := RunServer(ro1) + defer r1.Shutdown() + + l := &captureErrorLogger{errCh: make(chan string, 100)} + r1.SetLogger(l, false, false) + + ro2 := DefaultOptions() + ro2.Cluster.Name = "remote" + ro2.Cluster.Host = "127.0.0.1" + ro2.Cluster.Port = -1 + ro2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", ro1.Cluster.Port)) + ro2.LeafNode.ReconnectInterval = 50 * time.Millisecond + ro2.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: urls}} + r2 := RunServer(ro2) + defer r2.Shutdown() + + checkClusterFormed(t, r1, r2) + checkLeafNodeConnected(t, r1) + checkLeafNodeConnected(t, r2) + + l1.Shutdown() + + // Now wait for r1 and r2 to reconnect, they should not have a problem of loop detection. + checkLeafNodeConnected(t, r1) + checkLeafNodeConnected(t, r2) + + // Wait and make sure we don't have a loop error + timeout := time.NewTimer(500 * time.Millisecond) + for { + select { + case err := <-l.errCh: + if strings.Contains(err, "Loop detected") { + t.Fatal(err) + } + case <-timeout.C: + // OK, we are done. + return + } + } +} + +func TestLeafNodeUnsubOnRouteDisconnect(t *testing.T) { + lo1 := DefaultOptions() + lo1.LeafNode.Host = "127.0.0.1" + lo1.LeafNode.Port = -1 + lo1.Cluster.Name = "local" + lo1.Cluster.Host = "127.0.0.1" + lo1.Cluster.Port = -1 + l1 := RunServer(lo1) + defer l1.Shutdown() + + lo2 := DefaultOptions() + lo2.LeafNode.Host = "127.0.0.1" + lo2.LeafNode.Port = -1 + lo2.Cluster.Name = "local" + lo2.Cluster.Host = "127.0.0.1" + lo2.Cluster.Port = -1 + lo2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", lo1.Cluster.Port)) + l2 := RunServer(lo2) + defer l2.Shutdown() + + checkClusterFormed(t, l1, l2) + + u1, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", lo1.LeafNode.Port)) + u2, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", lo2.LeafNode.Port)) + urls := []*url.URL{u1, u2} + + ro1 := DefaultOptions() + // DefaultOptions sets a cluster name, so make sure they are different. + // Also, we don't have r1 and r2 clustered in this test, so set port to 0. + ro1.Cluster.Name = _EMPTY_ + ro1.Cluster.Port = 0 + ro1.LeafNode.ReconnectInterval = 50 * time.Millisecond + ro1.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: urls}} + r1 := RunServer(ro1) + defer r1.Shutdown() + + ro2 := DefaultOptions() + ro1.Cluster.Name = _EMPTY_ + ro2.Cluster.Port = 0 + ro2.LeafNode.ReconnectInterval = 50 * time.Millisecond + // Have this one point only to l2 + ro2.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{u2}}} + r2 := RunServer(ro2) + defer r2.Shutdown() + + checkLeafNodeConnected(t, r1) + checkLeafNodeConnected(t, r2) + + // Create a subscription on r1. + nc := natsConnect(t, r1.ClientURL()) + defer nc.Close() + sub := natsSubSync(t, nc, "foo") + natsFlush(t, nc) + + checkSubInterest(t, l2, globalAccountName, "foo", time.Second) + checkSubInterest(t, r2, globalAccountName, "foo", time.Second) + + nc2 := natsConnect(t, r2.ClientURL()) + defer nc2.Close() + natsPub(t, nc, "foo", []byte("msg1")) + + // Check message received + natsNexMsg(t, sub, time.Second) + + // Now shutdown l1, l2 should update subscription interest to r2. + // When r1 reconnects to l2, subscription should be updated too. + l1.Shutdown() + + // Wait a bit (so that the check of interest is not OK just because + // the route would not have been yet detected as broken), and check + // interest still present on r2, l2. + time.Sleep(100 * time.Millisecond) + checkSubInterest(t, l2, globalAccountName, "foo", time.Second) + checkSubInterest(t, r2, globalAccountName, "foo", time.Second) + + // Check again that message received ok + natsPub(t, nc, "foo", []byte("msg2")) + natsNexMsg(t, sub, time.Second) + + // Now close client. Interest should disappear on r2. Due to a bug, + // it was not. + nc.Close() + + checkFor(t, time.Second, 15*time.Millisecond, func() error { + acc := r2.GlobalAccount() + if n := acc.Interest("foo"); n != 0 { + return fmt.Errorf("Still interest on subject: %v", n) + } + return nil + }) +} diff --git a/server/route.go b/server/route.go index 5230100c..9b2ee9d0 100644 --- a/server/route.go +++ b/server/route.go @@ -891,7 +891,8 @@ func (c *client) removeRemoteSubs() { ase := as[accountName] if ase == nil { if v, ok := srv.accounts.Load(accountName); ok { - as[accountName] = &asubs{acc: v.(*Account), subs: []*subscription{sub}} + ase = &asubs{acc: v.(*Account), subs: []*subscription{sub}} + as[accountName] = ase } else { continue } @@ -901,6 +902,7 @@ func (c *client) removeRemoteSubs() { if srv.gateway.enabled { srv.gatewayUpdateSubInterest(accountName, sub, -1) } + srv.updateLeafNodes(ase.acc, sub, -1) } // Now remove the subs by batch for each account sublist. @@ -1077,9 +1079,9 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) { // We store local subs by account and subject and optionally queue name. // If we have a queue it will have a trailing weight which we do not want. if sub.queue != nil { - sub.sid = arg[:len(arg)-len(args[3+off])-1] + sub.sid = arg[len(sub.origin)+off : len(arg)-len(args[3+off])-1] } else { - sub.sid = arg + sub.sid = arg[len(sub.origin)+off:] } key := string(sub.sid)