diff --git a/server/leafnode.go b/server/leafnode.go index 6adc0186..19583e30 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -1256,10 +1256,13 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) { // Now walk the results and add them to our smap c.mu.Lock() + rc := c.leaf.remoteCluster c.leaf.smap = make(map[string]int32) for _, sub := range subs { // We ignore ourselves here. - if c != sub.client { + // Also don't add the subscription if it has a origin cluster and the + // cluster name matches the one of the client we are sending to. + if c != sub.client && (sub.origin == nil || (string(sub.origin) != rc)) { c.leaf.smap[keyFromSub(sub)]++ if c.leaf.tsub == nil { c.leaf.tsub = make(map[*subscription]struct{}) diff --git a/server/leafnode_test.go b/server/leafnode_test.go index 61fad5d4..0a69b490 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -3066,3 +3066,222 @@ func TestLeafNodeStreamImport(t *testing.T) { natsNexMsg(t, sub, time.Second) } + +func TestLeafNodeRouteSubWithOrigin(t *testing.T) { + lo1 := DefaultOptions() + lo1.LeafNode.Host = "127.0.0.1" + lo1.LeafNode.Port = -1 + lo1.Cluster.Name = "local" + lo1.Cluster.Host = "127.0.0.1" + lo1.Cluster.Port = -1 + l1 := RunServer(lo1) + defer l1.Shutdown() + + lo2 := DefaultOptions() + lo2.LeafNode.Host = "127.0.0.1" + lo2.LeafNode.Port = -1 + lo2.Cluster.Name = "local" + lo2.Cluster.Host = "127.0.0.1" + lo2.Cluster.Port = -1 + lo2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", lo1.Cluster.Port)) + l2 := RunServer(lo2) + defer l2.Shutdown() + + checkClusterFormed(t, l1, l2) + + u1, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", lo1.LeafNode.Port)) + urls := []*url.URL{u1} + + ro1 := DefaultOptions() + ro1.Cluster.Name = "remote" + ro1.Cluster.Host = "127.0.0.1" + ro1.Cluster.Port = -1 + ro1.LeafNode.ReconnectInterval = 50 * time.Millisecond + ro1.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: urls}} + r1 := RunServer(ro1) + defer r1.Shutdown() + + checkLeafNodeConnected(t, r1) + + nc := natsConnect(t, r1.ClientURL(), nats.NoReconnect()) + defer nc.Close() + natsSubSync(t, nc, "foo") + natsQueueSubSync(t, nc, "bar", "baz") + checkSubInterest(t, l2, globalAccountName, "foo", time.Second) + checkSubInterest(t, l2, globalAccountName, "bar", time.Second) + + // Now shutdown the leafnode and check that any subscription for $G on l2 are gone. + r1.Shutdown() + checkFor(t, time.Second, 15*time.Millisecond, func() error { + acc := l2.GlobalAccount() + if n := acc.TotalSubs(); n != 0 { + return fmt.Errorf("Account %q should have 0 sub, got %v", acc.GetName(), n) + } + return nil + }) +} + +func TestLeafNodeLoopDetectionWithMultipleClusters(t *testing.T) { + lo1 := DefaultOptions() + lo1.LeafNode.Host = "127.0.0.1" + lo1.LeafNode.Port = -1 + lo1.Cluster.Name = "local" + lo1.Cluster.Host = "127.0.0.1" + lo1.Cluster.Port = -1 + l1 := RunServer(lo1) + defer l1.Shutdown() + + lo2 := DefaultOptions() + lo2.LeafNode.Host = "127.0.0.1" + lo2.LeafNode.Port = -1 + lo2.Cluster.Name = "local" + lo2.Cluster.Host = "127.0.0.1" + lo2.Cluster.Port = -1 + lo2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", lo1.Cluster.Port)) + l2 := RunServer(lo2) + defer l2.Shutdown() + + checkClusterFormed(t, l1, l2) + + u1, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", lo1.LeafNode.Port)) + u2, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", lo2.LeafNode.Port)) + urls := []*url.URL{u1, u2} + + ro1 := DefaultOptions() + ro1.Cluster.Name = "remote" + ro1.Cluster.Host = "127.0.0.1" + ro1.Cluster.Port = -1 + ro1.LeafNode.ReconnectInterval = 50 * time.Millisecond + ro1.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: urls}} + r1 := RunServer(ro1) + defer r1.Shutdown() + + l := &captureErrorLogger{errCh: make(chan string, 100)} + r1.SetLogger(l, false, false) + + ro2 := DefaultOptions() + ro2.Cluster.Name = "remote" + ro2.Cluster.Host = "127.0.0.1" + ro2.Cluster.Port = -1 + ro2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", ro1.Cluster.Port)) + ro2.LeafNode.ReconnectInterval = 50 * time.Millisecond + ro2.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: urls}} + r2 := RunServer(ro2) + defer r2.Shutdown() + + checkClusterFormed(t, r1, r2) + checkLeafNodeConnected(t, r1) + checkLeafNodeConnected(t, r2) + + l1.Shutdown() + + // Now wait for r1 and r2 to reconnect, they should not have a problem of loop detection. + checkLeafNodeConnected(t, r1) + checkLeafNodeConnected(t, r2) + + // Wait and make sure we don't have a loop error + timeout := time.NewTimer(500 * time.Millisecond) + for { + select { + case err := <-l.errCh: + if strings.Contains(err, "Loop detected") { + t.Fatal(err) + } + case <-timeout.C: + // OK, we are done. + return + } + } +} + +func TestLeafNodeUnsubOnRouteDisconnect(t *testing.T) { + lo1 := DefaultOptions() + lo1.LeafNode.Host = "127.0.0.1" + lo1.LeafNode.Port = -1 + lo1.Cluster.Name = "local" + lo1.Cluster.Host = "127.0.0.1" + lo1.Cluster.Port = -1 + l1 := RunServer(lo1) + defer l1.Shutdown() + + lo2 := DefaultOptions() + lo2.LeafNode.Host = "127.0.0.1" + lo2.LeafNode.Port = -1 + lo2.Cluster.Name = "local" + lo2.Cluster.Host = "127.0.0.1" + lo2.Cluster.Port = -1 + lo2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", lo1.Cluster.Port)) + l2 := RunServer(lo2) + defer l2.Shutdown() + + checkClusterFormed(t, l1, l2) + + u1, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", lo1.LeafNode.Port)) + u2, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", lo2.LeafNode.Port)) + urls := []*url.URL{u1, u2} + + ro1 := DefaultOptions() + // DefaultOptions sets a cluster name, so make sure they are different. + // Also, we don't have r1 and r2 clustered in this test, so set port to 0. + ro1.Cluster.Name = _EMPTY_ + ro1.Cluster.Port = 0 + ro1.LeafNode.ReconnectInterval = 50 * time.Millisecond + ro1.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: urls}} + r1 := RunServer(ro1) + defer r1.Shutdown() + + ro2 := DefaultOptions() + ro1.Cluster.Name = _EMPTY_ + ro2.Cluster.Port = 0 + ro2.LeafNode.ReconnectInterval = 50 * time.Millisecond + // Have this one point only to l2 + ro2.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{u2}}} + r2 := RunServer(ro2) + defer r2.Shutdown() + + checkLeafNodeConnected(t, r1) + checkLeafNodeConnected(t, r2) + + // Create a subscription on r1. + nc := natsConnect(t, r1.ClientURL()) + defer nc.Close() + sub := natsSubSync(t, nc, "foo") + natsFlush(t, nc) + + checkSubInterest(t, l2, globalAccountName, "foo", time.Second) + checkSubInterest(t, r2, globalAccountName, "foo", time.Second) + + nc2 := natsConnect(t, r2.ClientURL()) + defer nc2.Close() + natsPub(t, nc, "foo", []byte("msg1")) + + // Check message received + natsNexMsg(t, sub, time.Second) + + // Now shutdown l1, l2 should update subscription interest to r2. + // When r1 reconnects to l2, subscription should be updated too. + l1.Shutdown() + + // Wait a bit (so that the check of interest is not OK just because + // the route would not have been yet detected as broken), and check + // interest still present on r2, l2. + time.Sleep(100 * time.Millisecond) + checkSubInterest(t, l2, globalAccountName, "foo", time.Second) + checkSubInterest(t, r2, globalAccountName, "foo", time.Second) + + // Check again that message received ok + natsPub(t, nc, "foo", []byte("msg2")) + natsNexMsg(t, sub, time.Second) + + // Now close client. Interest should disappear on r2. Due to a bug, + // it was not. + nc.Close() + + checkFor(t, time.Second, 15*time.Millisecond, func() error { + acc := r2.GlobalAccount() + if n := acc.Interest("foo"); n != 0 { + return fmt.Errorf("Still interest on subject: %v", n) + } + return nil + }) +} diff --git a/server/route.go b/server/route.go index 5230100c..9b2ee9d0 100644 --- a/server/route.go +++ b/server/route.go @@ -891,7 +891,8 @@ func (c *client) removeRemoteSubs() { ase := as[accountName] if ase == nil { if v, ok := srv.accounts.Load(accountName); ok { - as[accountName] = &asubs{acc: v.(*Account), subs: []*subscription{sub}} + ase = &asubs{acc: v.(*Account), subs: []*subscription{sub}} + as[accountName] = ase } else { continue } @@ -901,6 +902,7 @@ func (c *client) removeRemoteSubs() { if srv.gateway.enabled { srv.gatewayUpdateSubInterest(accountName, sub, -1) } + srv.updateLeafNodes(ase.acc, sub, -1) } // Now remove the subs by batch for each account sublist. @@ -1077,9 +1079,9 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) { // We store local subs by account and subject and optionally queue name. // If we have a queue it will have a trailing weight which we do not want. if sub.queue != nil { - sub.sid = arg[:len(arg)-len(args[3+off])-1] + sub.sid = arg[len(sub.origin)+off : len(arg)-len(args[3+off])-1] } else { - sub.sid = arg + sub.sid = arg[len(sub.origin)+off:] } key := string(sub.sid)